diff --git a/deps/oblib/src/lib/mysqlclient/ob_isql_connection.h b/deps/oblib/src/lib/mysqlclient/ob_isql_connection.h index 64e629caed..e5899bf9ac 100644 --- a/deps/oblib/src/lib/mysqlclient/ob_isql_connection.h +++ b/deps/oblib/src/lib/mysqlclient/ob_isql_connection.h @@ -80,14 +80,18 @@ public: // sql execute interface virtual int execute_read(const uint64_t tenant_id, const char *sql, ObISQLClient::ReadResult &res, bool is_user_sql = false, - bool is_from_pl = false) = 0; + bool is_from_pl = false, + const common::ObAddr *sql_exec_addr = nullptr) = 0; virtual int execute_read(const int64_t cluster_id, const uint64_t tenant_id, const ObString &sql, ObISQLClient::ReadResult &res, bool is_user_sql = false, - bool is_from_pl = false) = 0; + bool is_from_pl = false, + const common::ObAddr *sql_exec_addr = nullptr) = 0; virtual int execute_write(const uint64_t tenant_id, const char *sql, - int64_t &affected_rows, bool is_user_sql = false) = 0; + int64_t &affected_rows, bool is_user_sql = false, + const common::ObAddr *sql_exec_addr = nullptr) = 0; virtual int execute_write(const uint64_t tenant_id, const ObString &sql, - int64_t &affected_rows, bool is_user_sql = false) = 0; + int64_t &affected_rows, bool is_user_sql = false, + const common::ObAddr *sql_exec_addr = nullptr) = 0; // transaction interface virtual int start_transaction(const uint64_t &tenant_id, bool with_snap_shot = false) = 0; @@ -116,6 +120,7 @@ public: virtual void set_nls_formats(const ObString *nls_formats) { UNUSED(nls_formats); } virtual void set_is_load_data_exec(bool v) { UNUSED(v); } virtual void set_force_remote_exec(bool v) { UNUSED(v); } + virtual void set_use_external_session(bool v) { UNUSED(v); } virtual int64_t get_cluster_id() const { return common::OB_INVALID_ID; } void set_init_remote_env(bool flag) { is_init_remote_env_ = flag;} bool get_init_remote_env() const { return is_init_remote_env_; } diff --git a/deps/oblib/src/lib/mysqlclient/ob_mysql_connection.cpp b/deps/oblib/src/lib/mysqlclient/ob_mysql_connection.cpp index ac5e54bc3f..5a4d076dfa 100644 --- a/deps/oblib/src/lib/mysqlclient/ob_mysql_connection.cpp +++ b/deps/oblib/src/lib/mysqlclient/ob_mysql_connection.cpp @@ -372,16 +372,17 @@ int ObMySQLConnection::switch_tenant(const uint64_t tenant_id) } int ObMySQLConnection::execute_write(const uint64_t tenant_id, const ObString &sql, - int64_t &affected_rows, bool is_user_sql) + int64_t &affected_rows, bool is_user_sql, const common::ObAddr *sql_exec_addr) { - UNUSEDx(tenant_id, sql, affected_rows, is_user_sql); + UNUSEDx(tenant_id, sql, affected_rows, is_user_sql, sql_exec_addr); return OB_NOT_SUPPORTED; } int ObMySQLConnection::execute_write(const uint64_t tenant_id, const char *sql, - int64_t &affected_rows, bool is_user_sql) + int64_t &affected_rows, bool is_user_sql, const common::ObAddr *sql_exec_addr) { UNUSED(is_user_sql); + UNUSED(sql_exec_addr); int ret = OB_SUCCESS; if (OB_UNLIKELY(closed_)) { ret = OB_NOT_INIT; @@ -398,17 +399,18 @@ int ObMySQLConnection::execute_write(const uint64_t tenant_id, const char *sql, } int ObMySQLConnection::execute_read(const int64_t cluster_id, const uint64_t tenant_id, - const ObString &sql, ObISQLClient::ReadResult &res, bool is_user_sql, bool is_from_pl) + const ObString &sql, ObISQLClient::ReadResult &res, bool is_user_sql, bool is_from_pl, const common::ObAddr *sql_exec_addr) { - UNUSEDx(cluster_id, tenant_id, sql, res, is_user_sql, is_from_pl); + UNUSEDx(cluster_id, tenant_id, sql, res, is_user_sql, is_from_pl, sql_exec_addr); return OB_NOT_SUPPORTED; } int ObMySQLConnection::execute_read(const uint64_t tenant_id, const char *sql, - ObISQLClient::ReadResult &res, bool is_user_sql, bool is_from_pl) + ObISQLClient::ReadResult &res, bool is_user_sql, bool is_from_pl, const common::ObAddr *sql_exec_addr) { UNUSED(is_user_sql); UNUSED(is_from_pl); + UNUSED(sql_exec_addr); int ret = OB_SUCCESS; ObMySQLReadContext *read_ctx = NULL; if (OB_UNLIKELY(closed_)) { diff --git a/deps/oblib/src/lib/mysqlclient/ob_mysql_connection.h b/deps/oblib/src/lib/mysqlclient/ob_mysql_connection.h index a2b5123529..74ba9497e0 100644 --- a/deps/oblib/src/lib/mysqlclient/ob_mysql_connection.h +++ b/deps/oblib/src/lib/mysqlclient/ob_mysql_connection.h @@ -68,14 +68,21 @@ public: virtual int execute_read(const uint64_t tenant_id, const char *sql, ObISQLClient::ReadResult &res, bool is_user_sql = false, - bool is_from_pl = false) override; + bool is_from_pl = false, + const common::ObAddr *sql_exec_addr = nullptr) override; + virtual int execute_read(const int64_t cluster_id, const uint64_t tenant_id, const ObString &sql, ObISQLClient::ReadResult &res, bool is_user_sql = false, - bool is_from_pl = false) override; + bool is_from_pl = false, + const common::ObAddr *sql_exec_addr = nullptr) override; + virtual int execute_write(const uint64_t tenant_id, const ObString &sql, - int64_t &affected_rows, bool is_user_sql = false) override; + int64_t &affected_rows, bool is_user_sql = false, + const common::ObAddr *sql_exec_addr = nullptr) override; + virtual int execute_write(const uint64_t tenant_id, const char *sql, - int64_t &affected_rows, bool is_user_sql = false) override; + int64_t &affected_rows, bool is_user_sql = false, + const common::ObAddr *sql_exec_addr = nullptr) override; virtual int start_transaction(const uint64_t &tenant_id, bool with_snap_shot = false) override; virtual int rollback() override; diff --git a/deps/oblib/src/lib/mysqlclient/ob_mysql_proxy.cpp b/deps/oblib/src/lib/mysqlclient/ob_mysql_proxy.cpp index 6817bed462..27eeccb671 100644 --- a/deps/oblib/src/lib/mysqlclient/ob_mysql_proxy.cpp +++ b/deps/oblib/src/lib/mysqlclient/ob_mysql_proxy.cpp @@ -66,6 +66,22 @@ int ObCommonSqlProxy::read(ReadResult &result, const uint64_t tenant_id, const c return ret; } +int ObCommonSqlProxy::read(ReadResult &result, const uint64_t tenant_id, const char *sql, const common::ObAddr *exec_sql_addr) +{ + int ret = OB_SUCCESS; + ObISQLConnection *conn = NULL; + if (OB_ISNULL(exec_sql_addr)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("read with typically exec addr failed", K(ret), K(exec_sql_addr)); + } else if (OB_FAIL(acquire(tenant_id, conn))) { + LOG_WARN("acquire connection failed", K(ret), K(conn)); + } else if (OB_FAIL(read(conn, result, tenant_id, sql, exec_sql_addr))) { + LOG_WARN("read failed", K(ret)); + } + close(conn, ret); + return ret; +} + int ObCommonSqlProxy::read(ReadResult &result, const uint64_t tenant_id, const char *sql, const ObSessionParam *session_param) { int ret = OB_SUCCESS; @@ -74,6 +90,7 @@ int ObCommonSqlProxy::read(ReadResult &result, const uint64_t tenant_id, const c LOG_WARN("acquire connection failed", K(ret), K(conn)); } else if (nullptr != session_param) { conn->set_ddl_info(&session_param->ddl_info_); + conn->set_use_external_session(session_param->use_external_session_); if (nullptr != session_param->sql_mode_) { if (OB_FAIL(conn->set_session_variable("sql_mode", *session_param->sql_mode_))) { LOG_WARN("set inner connection sql mode failed", K(ret)); @@ -92,7 +109,7 @@ int ObCommonSqlProxy::read(ReadResult &result, const uint64_t tenant_id, const c } int ObCommonSqlProxy::read(ObISQLConnection *conn, ReadResult &result, - const uint64_t tenant_id, const char *sql) + const uint64_t tenant_id, const char *sql, const common::ObAddr *exec_sql_addr) { int ret = OB_SUCCESS; const int64_t start = ::oceanbase::common::ObTimeUtility::current_time(); @@ -104,7 +121,7 @@ int ObCommonSqlProxy::read(ObISQLConnection *conn, ReadResult &result, ret = OB_INACTIVE_SQL_CLIENT; LOG_WARN("in active sql client", K(ret), KCSTRING(sql)); } else { - if (OB_FAIL(conn->execute_read(tenant_id, sql, result))) { + if (OB_FAIL(conn->execute_read(tenant_id, sql, result, exec_sql_addr))) { LOG_WARN("query failed", K(ret), K(conn), K(start), KCSTRING(sql)); } } @@ -139,7 +156,9 @@ int ObCommonSqlProxy::write(const uint64_t tenant_id, const char *sql, int64_t & } int ObCommonSqlProxy::write(const uint64_t tenant_id, const ObString sql, - int64_t &affected_rows, int64_t compatibility_mode, const ObSessionParam *param /* = nullptr*/) + int64_t &affected_rows, int64_t compatibility_mode, + const ObSessionParam *param /* = nullptr*/, + const common::ObAddr *sql_exec_addr) { int ret = OB_SUCCESS; bool is_user_sql = false; @@ -177,6 +196,7 @@ int ObCommonSqlProxy::write(const uint64_t tenant_id, const ObString sql, } if (OB_SUCC(ret) && nullptr != param) { conn->set_is_load_data_exec(param->is_load_data_exec_); + conn->set_use_external_session(param->use_external_session_); if (param->is_load_data_exec_) { is_user_sql = true; } @@ -202,8 +222,8 @@ int ObCommonSqlProxy::write(const uint64_t tenant_id, const ObString sql, } } if (OB_SUCC(ret)) { - if (OB_FAIL(conn->execute_write(tenant_id, sql, affected_rows, is_user_sql))) { - LOG_WARN("execute sql failed", K(ret), K(conn), K(start), K(sql)); + if (OB_FAIL(conn->execute_write(tenant_id, sql, affected_rows, is_user_sql, sql_exec_addr))) { + LOG_WARN("execute sql failed", K(ret), K(tenant_id), K(conn), K(start), K(sql)); } else if (old_compatibility_mode != compatibility_mode && OB_FAIL(conn->set_session_variable("ob_compatibility_mode", old_compatibility_mode))) { LOG_WARN("fail to recover inner connection sql mode", K(ret)); diff --git a/deps/oblib/src/lib/mysqlclient/ob_mysql_proxy.h b/deps/oblib/src/lib/mysqlclient/ob_mysql_proxy.h index 25cf2dbc71..601c629441 100644 --- a/deps/oblib/src/lib/mysqlclient/ob_mysql_proxy.h +++ b/deps/oblib/src/lib/mysqlclient/ob_mysql_proxy.h @@ -67,7 +67,7 @@ struct ObSessionParam final { public: ObSessionParam() - : sql_mode_(nullptr), tz_info_wrap_(nullptr), ddl_info_(), is_load_data_exec_(false), nls_formats_{} + : sql_mode_(nullptr), tz_info_wrap_(nullptr), ddl_info_(), is_load_data_exec_(false), use_external_session_(false), nls_formats_{} {} ~ObSessionParam() = default; public: @@ -75,6 +75,7 @@ public: ObTimeZoneInfoWrap *tz_info_wrap_; ObSessionDDLInfo ddl_info_; bool is_load_data_exec_; + bool use_external_session_; // need init remote inner sql conn with sess getting from sess mgr common::ObString nls_formats_[common::ObNLSFormatEnum::NLS_MAX]; }; @@ -100,6 +101,7 @@ public: // execute query and return data result virtual int read(ReadResult &res, const uint64_t tenant_id, const char *sql) override; int read(ReadResult &res, const uint64_t tenant_id, const char *sql, const ObSessionParam *session_param); + int read(ReadResult &res, const uint64_t tenant_id, const char *sql, const common::ObAddr *sql_exec_addr); //only for across cluster //cluster_id can not GCONF.cluster_id virtual int read(ReadResult &res, @@ -110,7 +112,8 @@ public: // execute update sql virtual int write(const uint64_t tenant_id, const char *sql, int64_t &affected_rows) override; int write(const uint64_t tenant_id, const ObString sql, int64_t &affected_rows, int64_t compatibility_mode, - const ObSessionParam *session_param = nullptr); + const ObSessionParam *session_param = nullptr, + const common::ObAddr *sql_exec_addr = nullptr); using ObISQLClient::write; bool is_inited() const { return NULL != pool_; } @@ -129,7 +132,7 @@ protected: int acquire(sqlclient::ObISQLConnection *&conn) { return this->acquire(OB_INVALID_TENANT_ID, conn); } int acquire(const uint64_t tenant_id, sqlclient::ObISQLConnection *&conn); int read(sqlclient::ObISQLConnection *conn, ReadResult &result, - const uint64_t tenant_id, const char *sql); + const uint64_t tenant_id, const char *sql, const common::ObAddr *sql_exec_addr = nullptr); sqlclient::ObISQLConnectionPool *pool_; diff --git a/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h b/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h index fb61fce90c..658bf60783 100644 --- a/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h +++ b/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h @@ -736,6 +736,8 @@ PCODE_DEF(OB_BATCH_GET_TABLET_AUTOINC_SEQ, 0x952) PCODE_DEF(OB_BATCH_SET_TABLET_AUTOINC_SEQ, 0x953) PCODE_DEF(OB_REMOTE_WRITE_DDL_PREPARE_LOG, 0x954) +PCODE_DEF(OB_DDL_CHECK_TABLET_MERGE_STATUS, 0x957) + // Depedency Detector PCODE_DEF(OB_DETECTOR_LCL_MESSAGE, 0x9F0) PCODE_DEF(OB_DETECTOR_COLLECT_INFO_MESSAGE, 0x9F1) diff --git a/src/observer/ob_inner_sql_connection.cpp b/src/observer/ob_inner_sql_connection.cpp index a9fa579f10..c5883f3ad5 100644 --- a/src/observer/ob_inner_sql_connection.cpp +++ b/src/observer/ob_inner_sql_connection.cpp @@ -149,7 +149,8 @@ ObInnerSQLConnection::ObInnerSQLConnection() resource_conn_id_(OB_INVALID_ID), last_query_timestamp_(0), force_remote_execute_(false), - force_no_reuse_(false) + force_no_reuse_(false), + use_external_session_(false) { } @@ -309,16 +310,22 @@ void ObInnerSQLConnection::set_is_load_data_exec(bool v) get_session().set_load_data_exec_session(v); } -int ObInnerSQLConnection::init_session(sql::ObSQLSessionInfo* extern_session, const bool is_ddl) +int ObInnerSQLConnection::init_session_info( + sql::ObSQLSessionInfo *session, + const bool is_extern_session, + const bool is_oracle_mode, + const bool is_ddl) { int ret = OB_SUCCESS; - if (NULL == extern_session) { + if (NULL == session) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("fail to init session info, not pointer", K(ret), KPC(session)); + } else { // called in init(), can not check inited_ flag. - ObArenaAllocator *allocator = NULL; const bool print_info_log = false; const bool is_sys_tenant = true; ObPCMemPctConf pc_mem_conf; - inner_session_.set_inner_session(); + session->set_inner_session(); ObObj mysql_mode; ObObj oracle_mode; mysql_mode.set_int(0); @@ -327,46 +334,70 @@ int ObInnerSQLConnection::init_session(sql::ObSQLSessionInfo* extern_session, co ObObj oracle_sql_mode; mysql_sql_mode.set_uint(ObUInt64Type, DEFAULT_MYSQL_MODE); oracle_sql_mode.set_uint(ObUInt64Type, DEFAULT_ORACLE_MODE); - if (OB_FAIL(inner_session_.init(INNER_SQL_SESS_ID, INNER_SQL_PROXY_SESS_ID, allocator))) { - LOG_WARN("init session failed", K(ret)); - } else if (OB_FAIL(inner_session_.load_default_sys_variable(print_info_log, is_sys_tenant))) { + if (OB_FAIL(session->load_default_sys_variable(print_info_log, is_sys_tenant))) { LOG_WARN("session load default system variable failed", K(ret)); - } else if (OB_FAIL(inner_session_.update_max_packet_size())) { + } else if (OB_FAIL(session->update_max_packet_size())) { LOG_WARN("fail to update max packet size", K(ret)); - } else if (OB_FAIL(inner_session_.init_tenant(OB_SYS_TENANT_NAME, OB_SYS_TENANT_ID))) { + } else if (OB_FAIL(session->init_tenant(OB_SYS_TENANT_NAME, OB_SYS_TENANT_ID))) { LOG_WARN("fail to init tenant", K(ret)); - } else if (OB_FAIL(switch_tenant(OB_SYS_TENANT_ID))) { - LOG_WARN("set system tenant id failed", K(ret)); - } else if (OB_FAIL(inner_session_.update_sys_variable(SYS_VAR_SQL_MODE, - oracle_mode_ ? oracle_sql_mode : mysql_sql_mode))) { - LOG_WARN("update sys variables failed", K(ret)); - } else if (OB_FAIL(inner_session_.update_sys_variable(SYS_VAR_OB_COMPATIBILITY_MODE, - oracle_mode_ ? oracle_mode : mysql_mode))) { - LOG_WARN("update sys variables failed", K(ret)); - } else if (OB_FAIL(inner_session_.update_sys_variable(SYS_VAR_NLS_DATE_FORMAT, - ObTimeConverter::COMPAT_OLD_NLS_DATE_FORMAT))) { - LOG_WARN("update sys variables failed", K(ret)); - } else if (OB_FAIL(inner_session_.update_sys_variable(SYS_VAR_NLS_TIMESTAMP_FORMAT, - ObTimeConverter::COMPAT_OLD_NLS_TIMESTAMP_FORMAT))) { - LOG_WARN("update sys variables failed", K(ret)); - } else if (OB_FAIL(inner_session_.update_sys_variable(SYS_VAR_NLS_TIMESTAMP_TZ_FORMAT, - ObTimeConverter::COMPAT_OLD_NLS_TIMESTAMP_TZ_FORMAT))) { - LOG_WARN("update sys variables failed", K(ret)); - } else if (OB_FAIL(inner_session_.gen_configs_in_pc_str())) { - LOG_WARN("fail to generate configuration strings that can influence execution plan", K(ret)); } else { - ObString database_name(OB_SYS_DATABASE_NAME); - if (OB_FAIL(inner_session_.set_default_database(database_name))) { - LOG_WARN("fail to set default database", K(ret), K(database_name)); - } else if (OB_FAIL(inner_session_.get_pc_mem_conf(pc_mem_conf))) { - LOG_WARN("fail to get pc mem conf", K(ret)); - } else { - inner_session_.set_plan_cache_manager(ob_sql_->get_plan_cache_manager()); - inner_session_.set_database_id(OB_SYS_DATABASE_ID); - //TODO shengle ? - inner_session_.get_ddl_info().set_is_ddl(is_ddl); - inner_session_.reset_timezone(); + if (!is_extern_session) { // if not exetern session + if(OB_FAIL(session->switch_tenant(OB_SYS_TENANT_ID))) { + LOG_WARN("Init sys tenant in session error", K(ret)); + } else if (OB_FAIL(session->set_user(OB_SYS_USER_NAME, OB_SYS_HOST_NAME, OB_SYS_USER_ID))) { + LOG_WARN("Set sys user in session error", K(ret)); + } else { + session->set_user_priv_set(OB_PRIV_ALL | OB_PRIV_GRANT); + session->set_database_id(OB_SYS_DATABASE_ID); + } } + if (OB_SUCC(ret)) { + if (OB_FAIL(session->update_sys_variable( + SYS_VAR_SQL_MODE, is_oracle_mode ? oracle_sql_mode : mysql_sql_mode))) { + LOG_WARN("update sys variables failed", K(ret)); + } else if (OB_FAIL(session->update_sys_variable( + SYS_VAR_OB_COMPATIBILITY_MODE, is_oracle_mode ? oracle_mode : mysql_mode))) { + LOG_WARN("update sys variables failed", K(ret)); + } else if (OB_FAIL(session->update_sys_variable( + SYS_VAR_NLS_DATE_FORMAT, ObTimeConverter::COMPAT_OLD_NLS_DATE_FORMAT))) { + LOG_WARN("update sys variables failed", K(ret)); + } else if (OB_FAIL(session->update_sys_variable( + SYS_VAR_NLS_TIMESTAMP_FORMAT, ObTimeConverter::COMPAT_OLD_NLS_TIMESTAMP_FORMAT))) { + LOG_WARN("update sys variables failed", K(ret)); + } else if (OB_FAIL(session->update_sys_variable( + SYS_VAR_NLS_TIMESTAMP_TZ_FORMAT, ObTimeConverter::COMPAT_OLD_NLS_TIMESTAMP_TZ_FORMAT))) { + LOG_WARN("update sys variables failed", K(ret)); + } else { + ObString database_name(OB_SYS_DATABASE_NAME); + if (OB_FAIL(session->set_default_database(database_name))) { + LOG_WARN("fail to set default database", K(ret), K(database_name)); + } else if (OB_FAIL(session->get_pc_mem_conf(pc_mem_conf))) { + LOG_WARN("fail to get pc mem conf", K(ret)); + } else { + session->set_plan_cache_manager(GCTX.sql_engine_->get_plan_cache_manager()); + session->set_database_id(OB_SYS_DATABASE_ID); + //TODO shengle ? + session->get_ddl_info().set_is_ddl(is_ddl); + session->reset_timezone(); + } + } + } + } + } + return ret; +} + +int ObInnerSQLConnection::init_session(sql::ObSQLSessionInfo* extern_session, const bool is_ddl) +{ + int ret = OB_SUCCESS; + if (NULL == extern_session) { + sql::ObSQLSessionInfo * inner_session = &inner_session_; + ObArenaAllocator *allocator = NULL; + const bool is_extern_session = false; + if (OB_FAIL(inner_session->init(INNER_SQL_SESS_ID, INNER_SQL_PROXY_SESS_ID, allocator))) { + LOG_WARN("init session failed", K(ret)); + } else if (OB_FAIL(init_session_info(inner_session, is_extern_session, oracle_mode_, is_ddl))) { + LOG_WARN("fail to init session info", K(ret), KPC(inner_session)); } } else { extern_session_ = extern_session; @@ -666,6 +697,7 @@ int ObInnerSQLConnection::query(sqlclient::ObIExecutor &executor, // switch tenant for MTL tenant ctx uint64_t tenant_id = get_session().get_effective_tenant_id(); + if (OB_SUCC(ret)) { MTL_SWITCH(tenant_id) { execution_id = ob_sql_->get_execution_id(); @@ -710,6 +742,7 @@ int ObInnerSQLConnection::query(sqlclient::ObIExecutor &executor, } const uint64_t tenant_id = get_session().get_effective_tenant_id(); + if (OB_FAIL(ret)){ // do nothing } else if (OB_FAIL(gctx.schema_service_->get_tenant_schema_guard(tenant_id, res.schema_guard_))) { @@ -731,7 +764,7 @@ int ObInnerSQLConnection::query(sqlclient::ObIExecutor &executor, // do nothing } else if (OB_FAIL(SMART_CALL(do_query(executor, res)))) { ret_code = ret; - LOG_WARN("execute failed", K(ret), K(executor), K(retry_cnt)); + LOG_WARN("execute failed", K(ret), K(tenant_id), K(executor), K(retry_cnt)); ret = process_retry(res, ret, abs_timeout_us, need_retry, retry_cnt, is_from_pl); // moved here from ObInnerSQLConnection::do_query() -> ObInnerSQLResult::open(). int close_ret = res.force_close(); @@ -1253,7 +1286,7 @@ int ObInnerSQLConnection::start_transaction_inner( sql, ObInnerSQLTransmitArg::OPERATION_TYPE_START_TRANSACTION, lib::Worker::CompatMode::ORACLE == get_compat_mode(), GCONF.cluster_id, THIS_WORKER.get_timeout_ts(), query_timeout, trx_timeout, - sql_mode, ddl_info, is_load_data_exec); + sql_mode, ddl_info, is_load_data_exec, use_external_session_); arg.set_nls_formats(get_session().get_local_nls_date_format(), get_session().get_local_nls_timestamp_format(), get_session().get_local_nls_timestamp_tz_format()); @@ -1595,7 +1628,7 @@ int ObInnerSQLConnection::forward_request_(const uint64_t tenant_id, sql, (ObInnerSQLTransmitArg::InnerSQLOperationType)op_type, lib::Worker::CompatMode::ORACLE == get_compat_mode(), GCONF.cluster_id, THIS_WORKER.get_timeout_ts(), query_timeout, - trx_timeout, sql_mode, ddl_info, is_load_data_exec);; + trx_timeout, sql_mode, ddl_info, is_load_data_exec, use_external_session_); arg.set_nls_formats(get_session().get_local_nls_date_format(), get_session().get_local_nls_timestamp_format(), get_session().get_local_nls_timestamp_tz_format()); @@ -1667,7 +1700,7 @@ int ObInnerSQLConnection::rollback() ObString::make_string("ROLLBACK"), ObInnerSQLTransmitArg::OPERATION_TYPE_ROLLBACK, lib::Worker::CompatMode::ORACLE == get_compat_mode(), GCONF.cluster_id, THIS_WORKER.get_timeout_ts(), query_timeout, trx_timeout, sql_mode, - ddl_info, is_load_data_exec); + ddl_info, is_load_data_exec, use_external_session_); arg.set_nls_formats(get_session().get_local_nls_date_format(), get_session().get_local_nls_timestamp_format(), get_session().get_local_nls_timestamp_tz_format()); @@ -1738,7 +1771,7 @@ int ObInnerSQLConnection::commit() ObString::make_string("COMMIT"), ObInnerSQLTransmitArg::OPERATION_TYPE_COMMIT, lib::Worker::CompatMode::ORACLE == get_compat_mode(), GCONF.cluster_id, THIS_WORKER.get_timeout_ts(), query_timeout, trx_timeout, sql_mode, - ddl_info, is_load_data_exec); + ddl_info, is_load_data_exec, use_external_session_); arg.set_nls_formats(get_session().get_local_nls_date_format(), get_session().get_local_nls_timestamp_format(), get_session().get_local_nls_timestamp_tz_format()); @@ -1769,20 +1802,20 @@ int ObInnerSQLConnection::commit() } int ObInnerSQLConnection::execute_write(const uint64_t tenant_id, const char *sql, - int64_t &affected_rows, bool is_user_sql) + int64_t &affected_rows, bool is_user_sql, const common::ObAddr *sql_exec_addr) { int ret = OB_SUCCESS; - if (OB_FAIL(execute_write(tenant_id, ObString::make_string(sql), affected_rows, is_user_sql))) { + if (OB_FAIL(execute_write(tenant_id, ObString::make_string(sql), affected_rows, is_user_sql, sql_exec_addr))) { LOG_WARN("execute_write failed", K(ret), K(tenant_id), K(sql)); } return ret; } int ObInnerSQLConnection::execute_write(const uint64_t tenant_id, const ObString &sql, - int64_t &affected_rows, bool is_user_sql) + int64_t &affected_rows, bool is_user_sql, const common::ObAddr *sql_exec_addr) { int ret = OB_SUCCESS; - auto function = [&]() { return execute_write_inner(tenant_id, sql, affected_rows, is_user_sql); }; + auto function = [&]() { return execute_write_inner(tenant_id, sql, affected_rows, is_user_sql, sql_exec_addr); }; if (OB_FAIL(retry_while_no_tenant_resource(GCONF.cluster_id, tenant_id, function))) { LOG_WARN("execute_write failed", K(ret), K(tenant_id), K(sql), K(is_user_sql)); } @@ -1790,7 +1823,7 @@ int ObInnerSQLConnection::execute_write(const uint64_t tenant_id, const ObString } int ObInnerSQLConnection::execute_write_inner(const uint64_t tenant_id, const ObString &sql, - int64_t &affected_rows, bool is_user_sql) + int64_t &affected_rows, bool is_user_sql, const common::ObAddr *sql_exec_addr) { int ret = OB_SUCCESS; FLTSpanGuard(inner_execute_write); @@ -1838,6 +1871,9 @@ int ObInnerSQLConnection::execute_write_inner(const uint64_t tenant_id, const Ob LOG_WARN("resource_conn_id or resource_svr is invalid", K(ret), K(get_resource_svr()), K(get_resource_conn_id())); } + } else if (!OB_ISNULL(sql_exec_addr)) { // not in trans + set_resource_svr(*sql_exec_addr); + set_resource_conn_id(OB_INVALID_ID); } else { // not in trans common::ObAddr resource_server_addr; share::ObLSID ls_id(share::ObLSID::SYS_LS_ID); @@ -1866,7 +1902,7 @@ int ObInnerSQLConnection::execute_write_inner(const uint64_t tenant_id, const Ob sql, ObInnerSQLTransmitArg::OPERATION_TYPE_EXECUTE_WRITE, lib::Worker::CompatMode::ORACLE == get_compat_mode(), GCONF.cluster_id, THIS_WORKER.get_timeout_ts(), query_timeout, trx_timeout, sql_mode, - ddl_info, is_load_data_exec); + ddl_info, is_load_data_exec, use_external_session_); arg.set_nls_formats(get_session().get_local_nls_date_format(), get_session().get_local_nls_timestamp_format(), get_session().get_local_nls_timestamp_tz_format()); @@ -1926,9 +1962,10 @@ int ObInnerSQLConnection::execute_read(const uint64_t tenant_id, const char *sql, ObISQLClient::ReadResult &res, bool is_user_sql, - bool is_from_pl) + bool is_from_pl, + const common::ObAddr *sql_exec_addr) { - return execute_read(GCONF.cluster_id, tenant_id, sql, res, is_user_sql, is_from_pl); + return execute_read(GCONF.cluster_id, tenant_id, sql, res, is_user_sql, is_from_pl, sql_exec_addr); } int ObInnerSQLConnection::execute_read(const int64_t cluster_id, @@ -1936,11 +1973,12 @@ int ObInnerSQLConnection::execute_read(const int64_t cluster_id, const ObString &sql, ObISQLClient::ReadResult &res, bool is_user_sql, - bool is_from_pl) + bool is_from_pl, + const common::ObAddr *sql_exec_addr) { int ret = OB_SUCCESS; - auto function = [&]() { return execute_read_inner(cluster_id, tenant_id, sql, res, is_user_sql, is_from_pl); }; + auto function = [&]() { return execute_read_inner(cluster_id, tenant_id, sql, res, is_user_sql, is_from_pl, sql_exec_addr); }; if (OB_FAIL(retry_while_no_tenant_resource(cluster_id, tenant_id, function))) { LOG_WARN("execute_read failed", K(ret), K(cluster_id), K(tenant_id)); } @@ -1972,7 +2010,8 @@ int ObInnerSQLConnection::execute_read_inner(const int64_t cluster_id, const ObString &sql, ObISQLClient::ReadResult &res, bool is_user_sql, - bool is_from_pl) + bool is_from_pl, + const common::ObAddr *sql_exec_addr) { int ret = OB_SUCCESS; FLTSpanGuard(inner_execute_read); @@ -2024,7 +2063,11 @@ int ObInnerSQLConnection::execute_read_inner(const int64_t cluster_id, ret = OB_NOT_SUPPORTED; LOG_WARN("can not acrocc cluster in trans", KR(ret), K(resource_conn_id_)); } - } else { // not in trans + } else if (!OB_ISNULL(sql_exec_addr)) { + set_resource_svr(*sql_exec_addr); + set_resource_conn_id(OB_INVALID_ID); + LOG_INFO("set sql exec addr", KR(ret), K(*sql_exec_addr)); + } else { common::ObAddr resource_server_addr; share::ObLSID ls_id(share::ObLSID::SYS_LS_ID); if (OB_FAIL(nonblock_get_leader( @@ -2052,7 +2095,7 @@ int ObInnerSQLConnection::execute_read_inner(const int64_t cluster_id, sql, ObInnerSQLTransmitArg::OPERATION_TYPE_EXECUTE_READ, lib::Worker::CompatMode::ORACLE == get_compat_mode(), GCONF.cluster_id, THIS_WORKER.get_timeout_ts(), query_timeout, trx_timeout, sql_mode, - ddl_info, is_load_data_exec); + ddl_info, is_load_data_exec, use_external_session_); arg.set_nls_formats(get_session().get_local_nls_date_format(), get_session().get_local_nls_timestamp_format(), get_session().get_local_nls_timestamp_tz_format()); diff --git a/src/observer/ob_inner_sql_connection.h b/src/observer/ob_inner_sql_connection.h index 67d57874fd..c27bcd589c 100644 --- a/src/observer/ob_inner_sql_connection.h +++ b/src/observer/ob_inner_sql_connection.h @@ -132,16 +132,20 @@ public: inline void reset() { destroy(); } virtual int execute_read(const uint64_t tenant_id, const char *sql, common::ObISQLClient::ReadResult &res, - bool is_user_sql = false, bool is_from_pl = false) override; + bool is_user_sql = false, bool is_from_pl = false, + const common::ObAddr *sql_exec_addr = nullptr/* ddl inner sql execution addr */) override; virtual int execute_read(const int64_t cluster_id, const uint64_t tenant_id, const ObString &sql, common::ObISQLClient::ReadResult &res, - bool is_user_sql = false, bool is_from_pl = false) override; + bool is_user_sql = false, bool is_from_pl = false, + const common::ObAddr *sql_exec_addr = nullptr/* ddl inner sql execution addr */) override; virtual int execute_write(const uint64_t tenant_id, const char *sql, int64_t &affected_rows, - bool is_user_sql = false) override; + bool is_user_sql = false, + const common::ObAddr *sql_exec_addr = nullptr/* ddl inner sql execution addr */) override; virtual int execute_write(const uint64_t tenant_id, const ObString &sql, int64_t &affected_rows, - bool is_user_sql = false) override; + bool is_user_sql = false, + const common::ObAddr *sql_exec_addr = nullptr) override; virtual int start_transaction(const uint64_t &tenant_id, bool with_snap_shot = false) override; virtual int register_multi_data_source(const uint64_t &tenant_id, const share::ObLSID ls_id, @@ -174,6 +178,7 @@ public: virtual void set_nls_formats(const ObString *nls_formats); virtual void set_is_load_data_exec(bool v); virtual void set_force_remote_exec(bool v) { force_remote_execute_ = v; } + virtual void set_use_external_session(bool v) { use_external_session_ = v; } bool is_nested_conn(); void ref(); @@ -282,6 +287,11 @@ public: const sql::stmt::StmtType type, bool is_from_pl = false); + static int init_session_info(sql::ObSQLSessionInfo *session, + const bool is_extern_session, + const bool is_oracle_mode, + const bool is_ddl); + int64_t get_init_timestamp() const { return init_timestamp_; } public: @@ -354,9 +364,9 @@ private: int execute_read_inner(const int64_t cluster_id, const uint64_t tenant_id, const ObString &sql, common::ObISQLClient::ReadResult &res, - bool is_user_sql = false, bool is_from_pl = false); + bool is_user_sql = false, bool is_from_pl = false, const common::ObAddr *sql_exec_addr = nullptr); int execute_write_inner(const uint64_t tenant_id, const ObString &sql, int64_t &affected_rows, - bool is_user_sql = false); + bool is_user_sql = false, const common::ObAddr *sql_exec_addr = nullptr); int start_transaction_inner(const uint64_t &tenant_id, bool with_snap_shot = false); template int retry_while_no_tenant_resource(const int64_t cluster_id, const uint64_t &tenant_id, T function); @@ -393,7 +403,7 @@ private: bool is_in_trans_; bool is_resource_conn_; bool is_idle_; // for resource_conn_ - common::ObAddr resource_svr_; + common::ObAddr resource_svr_; // server of destination in local rpc call uint64_t resource_conn_id_; // resource conn_id of dst srv int64_t last_query_timestamp_; /* @@ -413,6 +423,10 @@ private: bool force_remote_execute_; bool force_no_reuse_; + // ask the inner sql connection to use external session instead of internal one + // this enables show session / kill session using sql query command + bool use_external_session_; + DISABLE_COPY_ASSIGN(ObInnerSQLConnection); }; diff --git a/src/observer/ob_inner_sql_rpc_processor.cpp b/src/observer/ob_inner_sql_rpc_processor.cpp index 2e90118db4..6a858eea4b 100644 --- a/src/observer/ob_inner_sql_rpc_processor.cpp +++ b/src/observer/ob_inner_sql_rpc_processor.cpp @@ -21,9 +21,11 @@ #include "lib/oblog/ob_log_module.h" #include "lib/container/ob_iarray.h" #include "storage/tx/ob_multi_data_source.h" +#include "sql/plan_cache/ob_plan_cache_util.h" using namespace oceanbase::common; using namespace oceanbase::share::schema; +using namespace oceanbase::sql; namespace oceanbase { @@ -247,12 +249,70 @@ int ObInnerSqlRpcP::process_lock_tablet(sqlclient::ObISQLConnection *conn, return ret; } +int ObInnerSqlRpcP::create_tmp_session( + uint64_t tenant_id, + sql::ObSQLSessionInfo *&tmp_session, + sql::ObFreeSessionCtx &free_session_ctx, + const bool is_oracle_mode) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(NULL != tmp_session)) { + ret = OB_INIT_TWICE; + LOG_WARN("tmp_session is not null.", K(ret)); + } else if (NULL == GCTX.session_mgr_) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("session manager is NULL", K(ret)); + } else { + uint32_t sid = sql::ObSQLSessionInfo::INVALID_SESSID; + uint64_t proxy_sid = 0; + if (OB_FAIL(GCTX.session_mgr_->create_sessid(sid))) { + LOG_WARN("alloc session id failed", K(ret)); + } else if (OB_FAIL(GCTX.session_mgr_->create_session(tenant_id, sid, proxy_sid, + ObTimeUtility::current_time(), + tmp_session))) { + GCTX.session_mgr_->mark_sessid_unused(sid); + tmp_session = NULL; + LOG_WARN("create session failed", K(ret), K(sid)); + } else { + const bool is_extern_session = true; + if (OB_NOT_NULL(tmp_session) + && OB_FAIL(observer::ObInnerSQLConnection::init_session_info( + tmp_session, + is_extern_session, + is_oracle_mode, + false))) { + LOG_WARN("fail to init session info", K(ret), KPC(tmp_session)); + } + free_session_ctx.sessid_ = sid; + free_session_ctx.proxy_sessid_ = proxy_sid; + } + } + return ret; +} + +void ObInnerSqlRpcP::cleanup_tmp_session( + sql::ObSQLSessionInfo *tmp_session, + sql::ObFreeSessionCtx &free_session_ctx) +{ + if (NULL != GCTX.session_mgr_ && NULL != tmp_session) { + tmp_session->set_session_sleep(); + GCTX.session_mgr_->revert_session(tmp_session); + GCTX.session_mgr_->free_session(free_session_ctx); + tmp_session = NULL; + GCTX.session_mgr_->mark_sessid_unused(free_session_ctx.sessid_); + } + ObActiveSessionGuard::setup_default_ash(); // enforce cleanup for future RPC cases +} + int ObInnerSqlRpcP::process() { int ret = OB_SUCCESS; const ObInnerSQLTransmitArg &transmit_arg = arg_; ObInnerSQLTransmitResult &transmit_result = result_; + sql::ObFreeSessionCtx free_session_ctx; + sql::ObSQLSessionInfo *tmp_session = NULL; // session got from session_mgr_ + if (OB_ISNULL(gctx_.res_inner_conn_pool_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("null ptr", K(ret), K(gctx_.sql_proxy_)); @@ -286,17 +346,39 @@ int ObInnerSqlRpcP::process() //only read can across cluster ret = OB_ERR_UNEXPECTED; LOG_ERROR("only read can execute across cluster", KR(ret), K(owner_cluster_id), K(transmit_arg)); - } else if (OB_FAIL(sql_str.assign_fmt("%.*s", transmit_arg.get_inner_sql().length(), - transmit_arg.get_inner_sql().ptr()))) { - LOG_WARN("assign sql to write_sql failed", K(ret), K(transmit_arg.get_inner_sql())); + } else if (transmit_arg.get_use_external_session() + && ((ObInnerSQLTransmitArg::OPERATION_TYPE_EXECUTE_READ != transmit_arg.get_operation_type() && ObInnerSQLTransmitArg::OPERATION_TYPE_EXECUTE_WRITE != transmit_arg.get_operation_type()) + || OB_INVALID_ID != transmit_arg.get_conn_id())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("only init remote conn with sess in read/write and not in trans", KR(ret), K(transmit_arg)); } else if (OB_ISNULL(pool)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("ptr is null", K(ret)); + } else if (transmit_arg.get_use_external_session() + && OB_FAIL(create_tmp_session(transmit_arg.get_tenant_id(), tmp_session, free_session_ctx, transmit_arg.get_is_oracle_mode()))) { + LOG_WARN("fail to create_tmp_session", K(ret), K(transmit_arg)); + } else if (OB_FAIL(sql_str.assign_fmt("%.*s", transmit_arg.get_inner_sql().length(), + transmit_arg.get_inner_sql().ptr()))) { + LOG_WARN("assign sql to write_sql failed", K(ret), K(transmit_arg.get_inner_sql())); } else if (OB_FAIL(pool->acquire(transmit_arg.get_conn_id(), transmit_arg.get_is_oracle_mode(), ObInnerSQLTransmitArg::OPERATION_TYPE_ROLLBACK == transmit_arg.get_operation_type(), - conn))) { + conn, tmp_session))) { + cleanup_tmp_session(tmp_session, free_session_ctx); LOG_WARN("failed to acquire inner connection", K(ret), K(transmit_arg)); } + /* init session info */ + if (OB_NOT_NULL(tmp_session)) { + tmp_session->set_current_trace_id(ObCurTraceId::get_trace_id()); + tmp_session->switch_tenant(transmit_arg.get_tenant_id()); + ObString sql_stmt(sql_str.ptr()); + if (OB_FAIL(tmp_session->set_session_active( + sql_stmt, + 0, /* ignore this parameter */ + ObTimeUtility::current_time(), + obmysql::COM_QUERY))) { + LOG_WARN("failed to set tmp session active", K(ret)); + } + } if (OB_FAIL(ret)) { } else if (OB_ISNULL(inner_conn = static_cast(conn))) { // do nothing, because conn was set null if need kill_using_conn in aquire @@ -386,6 +468,7 @@ int ObInnerSqlRpcP::process() transmit_result.set_err_code(ret); } } + cleanup_tmp_session(tmp_session, free_session_ctx); } return ret; diff --git a/src/observer/ob_inner_sql_rpc_processor.h b/src/observer/ob_inner_sql_rpc_processor.h index d5c07768d8..6ddc089f58 100644 --- a/src/observer/ob_inner_sql_rpc_processor.h +++ b/src/observer/ob_inner_sql_rpc_processor.h @@ -18,6 +18,7 @@ #include "observer/ob_inner_sql_rpc_proxy.h" #include "rpc/obrpc/ob_rpc_proxy.h" #include "rpc/obrpc/ob_rpc_processor.h" +#include "sql/session/ob_sql_session_mgr.h" namespace oceanbase { @@ -39,6 +40,15 @@ public: public: virtual int process(); private: + int create_tmp_session( + uint64_t tenant_id, + sql::ObSQLSessionInfo *&tmp_session, + sql::ObFreeSessionCtx &free_session_ctx, + const bool is_oracle_mode); + void cleanup_tmp_session( + sql::ObSQLSessionInfo *tmp_session, + sql::ObFreeSessionCtx &free_session_ctx); + int process_start_transaction( sqlclient::ObISQLConnection *conn, const ObSqlString &start_trans_sql, diff --git a/src/observer/ob_inner_sql_rpc_proxy.cpp b/src/observer/ob_inner_sql_rpc_proxy.cpp index 36689a570f..8c06aede0f 100644 --- a/src/observer/ob_inner_sql_rpc_proxy.cpp +++ b/src/observer/ob_inner_sql_rpc_proxy.cpp @@ -38,7 +38,8 @@ OB_DEF_SERIALIZE(obrpc::ObInnerSQLTransmitArg) tz_info_wrap_, ddl_info_, is_load_data_exec_, - nls_formats_); + nls_formats_, + use_external_session_); return ret; } @@ -61,7 +62,8 @@ OB_DEF_DESERIALIZE(obrpc::ObInnerSQLTransmitArg) tz_info_wrap_, ddl_info_, is_load_data_exec_, - nls_formats_); + nls_formats_, + use_external_session_); if (OB_SUCC(ret)) { (void)sql::ObSQLUtils::adjust_time_by_ntp_offset(worker_timeout_); } @@ -87,7 +89,8 @@ OB_DEF_SERIALIZE_SIZE(obrpc::ObInnerSQLTransmitArg) tz_info_wrap_, ddl_info_, is_load_data_exec_, - nls_formats_); + nls_formats_, + use_external_session_); return len; } // https://work.aone.alibaba-inc.com/issue/40701293 diff --git a/src/observer/ob_inner_sql_rpc_proxy.h b/src/observer/ob_inner_sql_rpc_proxy.h index d92405e2b6..2bb0ec210b 100644 --- a/src/observer/ob_inner_sql_rpc_proxy.h +++ b/src/observer/ob_inner_sql_rpc_proxy.h @@ -48,19 +48,22 @@ public: is_oracle_mode_(false), source_cluster_id_(OB_INVALID_CLUSTER_ID), worker_timeout_(OB_DEFAULT_SESSION_TIMEOUT), query_timeout_(OB_DEFAULT_SESSION_TIMEOUT), trx_timeout_(OB_DEFAULT_SESSION_TIMEOUT), - sql_mode_(0), tz_info_wrap_(), ddl_info_(), is_load_data_exec_(false), nls_formats_{} {}; + sql_mode_(0), tz_info_wrap_(), ddl_info_(), is_load_data_exec_(false), nls_formats_{}, + use_external_session_(false) {}; ObInnerSQLTransmitArg(common::ObAddr ctrl_svr, common::ObAddr runner_svr, uint64_t tenant_id, uint64_t conn_id, common::ObString inner_sql, InnerSQLOperationType operation_type, bool is_oracle_mode, const int64_t source_cluster_id, const int64_t worker_timeout, const int64_t query_timeout, const int64_t trx_timeout, - ObSQLMode sql_mode, ObSessionDDLInfo ddl_info, const bool is_load_data_exec) + ObSQLMode sql_mode, ObSessionDDLInfo ddl_info, const bool is_load_data_exec, + const bool use_external_session) : ctrl_svr_(ctrl_svr), runner_svr_(runner_svr), tenant_id_(tenant_id), conn_id_(conn_id), inner_sql_(inner_sql), operation_type_(operation_type), is_oracle_mode_(is_oracle_mode), source_cluster_id_(source_cluster_id), worker_timeout_(worker_timeout), query_timeout_(query_timeout), trx_timeout_(trx_timeout), sql_mode_(sql_mode), - tz_info_wrap_(), ddl_info_(ddl_info), is_load_data_exec_(false), nls_formats_{} {} + tz_info_wrap_(), ddl_info_(ddl_info), is_load_data_exec_(is_load_data_exec), nls_formats_{}, + use_external_session_(use_external_session) {} ~ObInnerSQLTransmitArg() {} const common::ObAddr &get_ctrl_svr() const { return ctrl_svr_; } @@ -124,6 +127,7 @@ public: ObSQLMode get_sql_mode() const { return sql_mode_; } bool get_is_load_data_exec() const { return is_load_data_exec_; } const ObString *get_nls_formats() const { return nls_formats_; } + bool get_use_external_session() const { return use_external_session_; } TO_STRING_KV(K_(ctrl_svr), K_(runner_svr), @@ -140,7 +144,8 @@ public: K_(tz_info_wrap), K_(ddl_info), K_(is_load_data_exec), - K_(nls_formats)); + K_(nls_formats), + K_(use_external_session)); private: common::ObAddr ctrl_svr_; @@ -159,6 +164,7 @@ private: ObSessionDDLInfo ddl_info_; bool is_load_data_exec_; common::ObString nls_formats_[common::ObNLSFormatEnum::NLS_MAX]; + bool use_external_session_; }; class ObInnerSQLTransmitResult diff --git a/src/observer/ob_resource_inner_sql_connection_pool.cpp b/src/observer/ob_resource_inner_sql_connection_pool.cpp index 4d3857a833..955817f983 100644 --- a/src/observer/ob_resource_inner_sql_connection_pool.cpp +++ b/src/observer/ob_resource_inner_sql_connection_pool.cpp @@ -79,7 +79,7 @@ int ObResourceInnerSQLConnectionPool::fetch_max_conn_id(uint64_t &max_conn_id) int ObResourceInnerSQLConnectionPool::acquire( const uint64_t conn_id, const bool is_oracle_mode, const bool kill_using_conn, - common::sqlclient::ObISQLConnection *&conn) + common::sqlclient::ObISQLConnection *&conn, sql::ObSQLSessionInfo *session_info) { int ret = OB_SUCCESS; ObLatchWGuard guard(lock_, ObLatchIds::DEFAULT_MUTEX); @@ -89,7 +89,10 @@ int ObResourceInnerSQLConnectionPool::acquire( ret = OB_NOT_INIT; LOG_WARN("ObResourceInnerSQLConnectionPool has not been inited", K(ret)); } else if (OB_INVALID_ID == conn_id) { - if (OB_FAIL(inner_sql_conn_pool_.acquire(NULL, conn, is_oracle_mode))) { + if (NULL != session_info) { + session_info->set_compatibility_mode(is_oracle_mode ? ObCompatibilityMode::ORACLE_MODE : ObCompatibilityMode::MYSQL_MODE); + } + if (OB_FAIL(inner_sql_conn_pool_.acquire(session_info, conn, is_oracle_mode))) { LOG_WARN("failed to acquire inner connection", K(ret)); } else if (FALSE_IT(inner_conn = static_cast(conn))) { } else if (OB_ISNULL(inner_conn)) { diff --git a/src/observer/ob_resource_inner_sql_connection_pool.h b/src/observer/ob_resource_inner_sql_connection_pool.h index 5a8081ebad..1585d17a53 100644 --- a/src/observer/ob_resource_inner_sql_connection_pool.h +++ b/src/observer/ob_resource_inner_sql_connection_pool.h @@ -37,7 +37,7 @@ public: const bool is_ddl = false); bool is_inited() { return is_inited_; } int acquire(const uint64_t conn_id, const bool is_oracle_mode, const bool kill_using_conn, - common::sqlclient::ObISQLConnection *&conn); + common::sqlclient::ObISQLConnection *&conn, sql::ObSQLSessionInfo *session_info); int release(const bool reuse_conn, common::sqlclient::ObISQLConnection *&conn); private: diff --git a/src/observer/ob_rpc_processor_simple.cpp b/src/observer/ob_rpc_processor_simple.cpp index a52ebe6a00..d39b3321a3 100644 --- a/src/observer/ob_rpc_processor_simple.cpp +++ b/src/observer/ob_rpc_processor_simple.cpp @@ -614,6 +614,18 @@ int ObRpcCheckCtxCreateTimestampElapsedP::process() return ret; } +int ObRpcDDLCheckTabletMergeStatusP::process() +{ + int ret = OB_SUCCESS; + if (OB_ISNULL(gctx_.ob_service_)) { + ret = OB_INVALID_ARGUMENT; + LOG_ERROR("invalid arguments", K(ret), KP(gctx_.ob_service_)); + } else { + ret = gctx_.ob_service_->check_ddl_tablet_merge_status(arg_, result_); + } + return ret; +} + int ObRpcSwitchLeaderP::process() { int ret = OB_SUCCESS; diff --git a/src/observer/ob_rpc_processor_simple.h b/src/observer/ob_rpc_processor_simple.h index 0e41c551ed..46e3abb24a 100644 --- a/src/observer/ob_rpc_processor_simple.h +++ b/src/observer/ob_rpc_processor_simple.h @@ -212,6 +212,7 @@ OB_DEFINE_PROCESSOR_S(Srv, OB_CHECK_BACKUP_DEST_CONNECTIVITY, ObRpcCheckBackupDe OB_DEFINE_PROCESSOR_S(Srv, OB_GET_LS_ACCESS_MODE, ObRpcGetLSAccessModeP); OB_DEFINE_PROCESSOR_S(Srv, OB_CHANGE_LS_ACCESS_MODE, ObRpcChangeLSAccessModeP); OB_DEFINE_PROCESSOR_S(Srv, OB_ESTIMATE_TABLET_BLOCK_COUNT, ObEstimateTabletBlockCountP); +OB_DEFINE_PROCESSOR_S(Srv, OB_DDL_CHECK_TABLET_MERGE_STATUS, ObRpcDDLCheckTabletMergeStatusP); } // end of namespace observer } // end of namespace oceanbase diff --git a/src/observer/ob_service.cpp b/src/observer/ob_service.cpp index 767290e85a..2925eb13ba 100644 --- a/src/observer/ob_service.cpp +++ b/src/observer/ob_service.cpp @@ -70,6 +70,7 @@ #include "share/backup/ob_backup_connectivity.h" #include "observer/report/ob_tenant_meta_checker.h"//ObTenantMetaChecker #include "storage/compaction/ob_tenant_tablet_scheduler.h" +#include "storage/ddl/ob_tablet_ddl_kv_mgr.h" namespace oceanbase { @@ -1128,6 +1129,54 @@ int ObService::check_schema_version_elapsed( return ret; } +int ObService::check_ddl_tablet_merge_status( + const obrpc::ObDDLCheckTabletMergeStatusArg &arg, + obrpc::ObDDLCheckTabletMergeStatusResult &result) +{ + int ret = OB_SUCCESS; + + if (OB_UNLIKELY(!inited_)) { + ret = OB_NOT_INIT; + LOG_WARN("not init", K(ret)); + } else if (OB_UNLIKELY(!arg.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", K(ret), K(arg)); + } else { + result.reset(); + MTL_SWITCH(arg.tenant_id_) { + for (int64_t i = 0; OB_SUCC(ret) && i < arg.tablet_ids_.count(); ++i) { + const common::ObTabletID &tablet_id = arg.tablet_ids_.at(i); + ObTabletHandle tablet_handle; + ObLSHandle ls_handle; + ObDDLKvMgrHandle ddl_kv_mgr_handle; + ObLSService *ls_service = nullptr; + bool status = false; + + if (OB_ISNULL(ls_service = MTL(ObLSService *))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("error unexpected, get ls service failed", K(ret)); + } else if (OB_UNLIKELY(!tablet_id.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid arguments", K(ret), K(arg)); + } else if (OB_FAIL(ls_service->get_ls(arg.ls_id_, ls_handle, ObLSGetMod::OBSERVER_MOD))) { + LOG_WARN("get ls failed", K(ret), K(arg)); + } else if (OB_FAIL(ls_handle.get_ls()->get_tablet(tablet_id, tablet_handle))) { + LOG_WARN("get tablet failed", K(ret)); + } + // check and update major status + if (OB_SUCC(ret)) { + ObSSTable *latest_major_sstable = static_cast( + tablet_handle.get_obj()->get_table_store().get_major_sstables().get_boundary_table(true/*last*/)); + status = nullptr != latest_major_sstable; + if (OB_FAIL(result.merge_status_.push_back(status))) { + LOG_WARN("fail to push back to array", K(ret), K(status), K(tablet_id)); + } + } + } + } + } + return ret; +} int ObService::batch_switch_rs_leader(const ObAddr &arg) { diff --git a/src/observer/ob_service.h b/src/observer/ob_service.h index 79bd658090..09141e5bac 100644 --- a/src/observer/ob_service.h +++ b/src/observer/ob_service.h @@ -165,6 +165,10 @@ public: const obrpc::ObCheckModifyTimeElapsedArg &arg, obrpc::ObCheckModifyTimeElapsedResult &result); + int check_ddl_tablet_merge_status( + const obrpc::ObDDLCheckTabletMergeStatusArg &arg, + obrpc::ObDDLCheckTabletMergeStatusResult &result); + //////////////////////////////////////////////////////////////// // ObRpcBatchSwitchRsLeaderP @RS leader coordinator & admin int batch_switch_rs_leader(const ObAddr &arg); diff --git a/src/observer/ob_srv_xlator_storage.cpp b/src/observer/ob_srv_xlator_storage.cpp index fc996223f1..13ffaf419a 100644 --- a/src/observer/ob_srv_xlator_storage.cpp +++ b/src/observer/ob_srv_xlator_storage.cpp @@ -107,4 +107,5 @@ void oceanbase::observer::init_srv_xlator_for_storage(ObSrvRpcXlator *xlator) { RPC_PROCESSOR(ObRpcLSRemoveNonPaxosReplicaP, gctx_); RPC_PROCESSOR(ObRpcLSModifyPaxosReplicaNumberP, gctx_); RPC_PROCESSOR(ObRpcLSCheckDRTaskExistP, gctx_); + RPC_PROCESSOR(ObRpcDDLCheckTabletMergeStatusP, gctx_); } diff --git a/src/observer/virtual_table/ob_all_virtual_session_info.cpp b/src/observer/virtual_table/ob_all_virtual_session_info.cpp index ff600533a6..5ae304e02b 100644 --- a/src/observer/virtual_table/ob_all_virtual_session_info.cpp +++ b/src/observer/virtual_table/ob_all_virtual_session_info.cpp @@ -268,7 +268,7 @@ int ObAllVirtualSessionInfo::FillScanner::operator()( obmysql::COM_STMT_EXECUTE == sess_info->get_mysql_cmd() || obmysql::COM_STMT_PREPARE == sess_info->get_mysql_cmd() || obmysql::COM_STMT_PREXECUTE == sess_info->get_mysql_cmd()) { - int len = sess_info->get_last_trace_id().to_string(trace_id_, sizeof(trace_id_)); + int len = sess_info->get_current_trace_id().to_string(trace_id_, sizeof(trace_id_)); cur_row_->cells_[cell_idx].set_varchar(trace_id_, len); cur_row_->cells_[cell_idx].set_collation_type(default_collation); } else { diff --git a/src/rootserver/ddl_task/ob_ddl_redefinition_task.cpp b/src/rootserver/ddl_task/ob_ddl_redefinition_task.cpp index 9d260920fe..6475e4abac 100644 --- a/src/rootserver/ddl_task/ob_ddl_redefinition_task.cpp +++ b/src/rootserver/ddl_task/ob_ddl_redefinition_task.cpp @@ -44,11 +44,13 @@ ObDDLRedefinitionSSTableBuildTask::ObDDLRedefinitionSSTableBuildTask( const common::ObCurTraceId::TraceId &trace_id, const int64_t parallelism, const bool use_heap_table_ddl_plan, - ObRootService *root_service) + ObRootService *root_service, + const common::ObAddr &inner_sql_exec_addr) : is_inited_(false), tenant_id_(tenant_id), task_id_(task_id), data_table_id_(data_table_id), dest_table_id_(dest_table_id), schema_version_(schema_version), snapshot_version_(snapshot_version), execution_id_(execution_id), sql_mode_(sql_mode), trace_id_(trace_id), parallelism_(parallelism), - use_heap_table_ddl_plan_(use_heap_table_ddl_plan), root_service_(root_service) + use_heap_table_ddl_plan_(use_heap_table_ddl_plan), root_service_(root_service), + inner_sql_exec_addr_(inner_sql_exec_addr) { set_retry_times(0); // do not retry } @@ -83,6 +85,8 @@ int ObDDLRedefinitionSSTableBuildTask::process() const ObSysVariableSchema *sys_variable_schema = nullptr; ObDDLTaskKey task_key(dest_table_id_, schema_version_); bool oracle_mode = false; + bool need_exec_new_inner_sql = true; + if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("ddl redefinition sstable build task not inited", K(ret)); @@ -99,7 +103,19 @@ int ObDDLRedefinitionSSTableBuildTask::process() LOG_WARN("sys variable schema is NULL", K(ret)); } else if (OB_FAIL(sys_variable_schema->get_oracle_mode(oracle_mode))) { LOG_WARN("get oracle mode failed", K(ret)); - } else if (OB_FAIL(ObDDLUtil::generate_build_replica_sql(tenant_id_, + } else { + (void)ObCheckTabletDataComplementOp::check_and_wait_old_complement_task(tenant_id_, + dest_table_id_, + inner_sql_exec_addr_, + trace_id_, + schema_version_, + snapshot_version_, + need_exec_new_inner_sql); + if (!need_exec_new_inner_sql) { + LOG_INFO("succ to wait and complete old task finished!", K(ret)); + } else if (OB_FAIL(root_service_->get_ddl_scheduler().on_update_execution_id(task_id_, execution_id_))) { // genenal new ObIndexSSTableBuildTask::execution_id_ and persist to inner table + LOG_WARN("failed to update execution id", K(ret)); + } else if (OB_FAIL(ObDDLUtil::generate_build_replica_sql(tenant_id_, data_table_id_, dest_table_id_, schema_version_, @@ -111,36 +127,44 @@ int ObDDLRedefinitionSSTableBuildTask::process() false/*use_schema_version_hint_for_src_table*/, &col_name_map_, sql_string))) { - LOG_WARN("fail to generate build replica sql", K(ret)); - } else { - ObTimeoutCtx timeout_ctx; - common::ObCommonSqlProxy *user_sql_proxy = nullptr; - int64_t affected_rows = 0; - if (oracle_mode) { - sql_mode_ = SMO_STRICT_ALL_TABLES | SMO_PAD_CHAR_TO_FULL_LENGTH; - } - ObSessionParam session_param; - session_param.sql_mode_ = reinterpret_cast(&sql_mode_); - session_param.tz_info_wrap_ = &tz_info_wrap_; - session_param.ddl_info_.set_is_ddl(true); - session_param.ddl_info_.set_source_table_hidden(false); - session_param.ddl_info_.set_dest_table_hidden(true); - session_param.ddl_info_.set_heap_table_ddl(use_heap_table_ddl_plan_); - if (oracle_mode) { - user_sql_proxy = GCTX.ddl_oracle_sql_proxy_; + LOG_WARN("fail to generate build replica sql", K(ret)); } else { - user_sql_proxy = GCTX.ddl_sql_proxy_; - } - LOG_INFO("execute sql" , K(sql_string), K(data_table_id_), K(tenant_id_), - "is_strict_mode", is_strict_mode(sql_mode_), K(sql_mode_), K(parallelism_)); - if (OB_FAIL(timeout_ctx.set_trx_timeout_us(OB_MAX_DDL_SINGLE_REPLICA_BUILD_TIMEOUT))) { - LOG_WARN("set trx timeout failed", K(ret)); - } else if (OB_FAIL(timeout_ctx.set_timeout(OB_MAX_DDL_SINGLE_REPLICA_BUILD_TIMEOUT))) { - LOG_WARN("set timeout failed", K(ret)); - } else { - if (OB_FAIL(user_sql_proxy->write(tenant_id_, sql_string.ptr(), affected_rows, - oracle_mode ? ObCompatibilityMode::ORACLE_MODE : ObCompatibilityMode::MYSQL_MODE, &session_param))) { - LOG_WARN("fail to execute build replica sql", K(ret), K(tenant_id_)); + ObTimeoutCtx timeout_ctx; + common::ObCommonSqlProxy *user_sql_proxy = nullptr; + int64_t affected_rows = 0; + if (oracle_mode) { + sql_mode_ = SMO_STRICT_ALL_TABLES | SMO_PAD_CHAR_TO_FULL_LENGTH; + } + ObSessionParam session_param; + session_param.sql_mode_ = reinterpret_cast(&sql_mode_); + session_param.tz_info_wrap_ = &tz_info_wrap_; + session_param.ddl_info_.set_is_ddl(true); + session_param.ddl_info_.set_source_table_hidden(false); + session_param.ddl_info_.set_dest_table_hidden(true); + session_param.ddl_info_.set_heap_table_ddl(use_heap_table_ddl_plan_); + session_param.use_external_session_ = true; // means session id dispatched by session mgr + + common::ObAddr *sql_exec_addr = nullptr; + if (inner_sql_exec_addr_.is_valid()) { + sql_exec_addr = &inner_sql_exec_addr_; + LOG_INFO("inner sql execute addr" , K(*sql_exec_addr)); + } + if (oracle_mode) { + user_sql_proxy = GCTX.ddl_oracle_sql_proxy_; + } else { + user_sql_proxy = GCTX.ddl_sql_proxy_; + } + LOG_INFO("execute sql" , K(sql_string), K(data_table_id_), K(tenant_id_), + "is_strict_mode", is_strict_mode(sql_mode_), K(sql_mode_), K(parallelism_)); + if (OB_FAIL(timeout_ctx.set_trx_timeout_us(OB_MAX_DDL_SINGLE_REPLICA_BUILD_TIMEOUT))) { + LOG_WARN("set trx timeout failed", K(ret)); + } else if (OB_FAIL(timeout_ctx.set_timeout(OB_MAX_DDL_SINGLE_REPLICA_BUILD_TIMEOUT))) { + LOG_WARN("set timeout failed", K(ret)); + } else { + if (OB_FAIL(user_sql_proxy->write(tenant_id_, sql_string.ptr(), affected_rows, + oracle_mode ? ObCompatibilityMode::ORACLE_MODE : ObCompatibilityMode::MYSQL_MODE, &session_param, sql_exec_addr))) { + LOG_WARN("fail to execute build replica sql", K(ret), K(tenant_id_)); + } } } } @@ -173,7 +197,8 @@ ObAsyncTask *ObDDLRedefinitionSSTableBuildTask::deep_copy(char *buf, const int64 trace_id_, parallelism_, use_heap_table_ddl_plan_, - root_service_); + root_service_, + inner_sql_exec_addr_); if (OB_FAIL(new_task->tz_info_wrap_.deep_copy(tz_info_wrap_))) { LOG_WARN("failed to copy tz info wrap", K(ret)); } else if (OB_FAIL(new_task->col_name_map_.assign(col_name_map_))) { diff --git a/src/rootserver/ddl_task/ob_ddl_redefinition_task.h b/src/rootserver/ddl_task/ob_ddl_redefinition_task.h index ebb6434466..73409a2f02 100644 --- a/src/rootserver/ddl_task/ob_ddl_redefinition_task.h +++ b/src/rootserver/ddl_task/ob_ddl_redefinition_task.h @@ -36,7 +36,8 @@ public: const common::ObCurTraceId::TraceId &trace_id, const int64_t parallelism, const bool use_heap_table_ddl_plan, - ObRootService *root_service); + ObRootService *root_service, + const common::ObAddr &inner_sql_exec_addr); int init( const ObTableSchema &orig_table_schema, const AlterTableSchema &alter_table_schema, @@ -61,6 +62,7 @@ private: int64_t parallelism_; bool use_heap_table_ddl_plan_; ObRootService *root_service_; + common::ObAddr inner_sql_exec_addr_; }; class ObSyncTabletAutoincSeqCtx final diff --git a/src/rootserver/ddl_task/ob_ddl_scheduler.cpp b/src/rootserver/ddl_task/ob_ddl_scheduler.cpp index 25da88b672..3bb837901b 100644 --- a/src/rootserver/ddl_task/ob_ddl_scheduler.cpp +++ b/src/rootserver/ddl_task/ob_ddl_scheduler.cpp @@ -1293,6 +1293,37 @@ int ObDDLScheduler::on_column_checksum_calc_reply( return ret; } +int ObDDLScheduler::on_update_execution_id( + const int64_t task_id, + int64_t &ret_execution_id) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + LOG_WARN("not init", K(ret)); + } else if (OB_UNLIKELY(task_id <= 0)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid arguments", K(ret), K(task_id)); + } else { + ObDDLTask *ddl_task = nullptr; + if (OB_FAIL(task_queue_.get_task(task_id, ddl_task))) { + LOG_WARN("get task failed", K(ret), K(task_id)); + } else if (OB_ISNULL(ddl_task)) { + ret = OB_ERR_SYS; + LOG_WARN("ddl task must not be nullptr", K(ret)); + } else if (OB_FAIL(ddl_task->get_task_id() != task_id)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("update task id is diff from ddl task id", + K(ret), K(task_id), KPC(ddl_task)); + } else if (OB_FAIL(ddl_task->push_execution_id())) { + LOG_WARN("fail to push execution id", K(ret), KPC(ddl_task)); + } else { + ret_execution_id = ddl_task->get_execution_id(); + } + } + return ret; +} + int ObDDLScheduler::on_sstable_complement_job_reply( const common::ObTabletID &tablet_id, const ObDDLTaskKey &task_key, diff --git a/src/rootserver/ddl_task/ob_ddl_scheduler.h b/src/rootserver/ddl_task/ob_ddl_scheduler.h index 46949d914a..2617919201 100644 --- a/src/rootserver/ddl_task/ob_ddl_scheduler.h +++ b/src/rootserver/ddl_task/ob_ddl_scheduler.h @@ -127,6 +127,10 @@ public: const uint64_t autoinc_val, const int ret_code); + int on_update_execution_id( + const int64_t task_id, + int64_t &ret_execution_id); + private: class DDLIdling : public ObThreadIdling { diff --git a/src/rootserver/ddl_task/ob_ddl_task.cpp b/src/rootserver/ddl_task/ob_ddl_task.cpp index 3f812cf061..765d073cd2 100644 --- a/src/rootserver/ddl_task/ob_ddl_task.cpp +++ b/src/rootserver/ddl_task/ob_ddl_task.cpp @@ -351,6 +351,11 @@ int ObDDLTask::switch_status(ObDDLTaskStatus new_status, const int ret_code) "new_state", real_new_status, K_(snapshot_version), ret_code_); task_status_ = real_new_status; } + + if (OB_CANCELED == real_ret_code) { + (void)ObDDLTaskRecordOperator::kill_task_inner_sql(root_service->get_sql_proxy(), + trace_id_, tenant_id_, sql_exec_addr_); // ignore return code + } } return ret; } @@ -623,7 +628,7 @@ int ObDDLTask::push_execution_id() LOG_WARN("start transaction failed", K(ret)); } else { if (OB_FAIL(ObDDLTaskRecordOperator::select_for_update(trans, tenant_id_, task_id_, table_task_status, table_execution_id))) { - LOG_WARN("select for update failed", K(ret), K(task_id_)); + LOG_WARN("select for update failed", K(ret), K(task_id)); } else if (OB_FAIL(ObDDLTaskRecordOperator::update_execution_id(trans, tenant_id_, task_id_, table_execution_id + 1))) { LOG_WARN("update task status failed", K(ret)); } else { @@ -2065,5 +2070,97 @@ int ObDDLTaskRecordOperator::select_for_update( return ret; } + +int ObDDLTaskRecordOperator::kill_inner_sql( + common::ObMySQLProxy &proxy, + const uint64_t tenant_id, + const uint64_t session_id) +{ + int ret = OB_SUCCESS; + ObSqlString sql_string; + int64_t affected_rows = 0; + + LOG_INFO("start to kill inner sql", K(session_id), K(tenant_id)); + if (OB_UNLIKELY(session_id <= 0 || tenant_id <= 0)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", K(ret), K(session_id), K(tenant_id)); + } else if (OB_FAIL(sql_string.assign_fmt("KILL %ld", session_id))) { + LOG_WARN("assign sql string failed", K(ret), K(session_id)); + } else if (OB_FAIL(proxy.write(tenant_id, sql_string.ptr(), affected_rows))) { + LOG_WARN("KILL session failed", K(ret), K(tenant_id), K(session_id)); + } else if (OB_UNLIKELY(affected_rows < 0)) { // kill session affected_rows is 0 + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected affected_rows", K(ret), K(affected_rows)); + } + return ret; +} + +int ObDDLTaskRecordOperator::kill_task_inner_sql( + common::ObMySQLProxy &proxy, + const common::ObCurTraceId::TraceId &trace_id, + const uint64_t tenant_id, + const common::ObAddr &sql_exec_addr) +{ + int ret = OB_SUCCESS; + char ip_str[common::OB_IP_STR_BUFF]; + + if (OB_UNLIKELY(!proxy.is_inited()) || !sql_exec_addr.is_valid()) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", K(ret), K(proxy.is_inited())); + } else if (!sql_exec_addr.ip_to_string(ip_str, sizeof(ip_str))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("ip to string failed", K(ret), K(sql_exec_addr)); + } else { + LOG_INFO("start ddl kill inner sql session", K(ret), K(trace_id)); + ObSqlString sql_string; + SMART_VAR(ObMySQLProxy::MySQLResult, res) { + sqlclient::ObMySQLResult *result = NULL; + char trace_id_str[64] = { 0 }; + char spec_charater = '%'; + if (OB_UNLIKELY(0 > trace_id.to_string(trace_id_str, sizeof(trace_id_str)))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get trace id string failed", K(ret), K(trace_id), K(tenant_id)); + } else if (OB_FAIL(sql_string.assign_fmt(" SELECT id as session_id FROM %s WHERE trace_id = \"%s\"" + " and svr_ip = \"%s\" and svr_port = %d and info like \"%cINSERT%cINTO%cSELECT%c\" ", + OB_ALL_VIRTUAL_SESSION_INFO_TNAME, + trace_id_str, + ip_str, + sql_exec_addr.get_port(), + spec_charater, + spec_charater, + spec_charater, + spec_charater))) { + LOG_WARN("assign sql string failed", K(ret)); + } else if (OB_FAIL(proxy.read(res, OB_SYS_TENANT_ID, sql_string.ptr(), &sql_exec_addr))) { // default use OB_SYS_TENANT_ID + LOG_WARN("query ddl task record failed", K(ret), K(sql_string)); + } else if (OB_ISNULL((result = res.get_result()))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("fail to get sql result", K(ret), KP(result)); + } else { + uint64_t session_id = 0; + while (OB_SUCC(ret)) { + if (OB_FAIL(result->next())) { + if (OB_ITER_END == ret) { + ret = OB_SUCCESS; + break; + } else { + LOG_WARN("fail to get next row", K(ret)); + } + } else { + EXTRACT_UINT_FIELD_MYSQL(*result, "session_id", session_id, uint64_t); + if (OB_FAIL(kill_inner_sql(proxy, tenant_id, session_id))){ + LOG_WARN("fail to kill session", K(ret), K(session_id), K(trace_id)); + } else { + LOG_WARN("succ to kill session", K(ret), K(session_id), K(trace_id)); + } + } + } + } + } + } + return ret; +} + + } // end namespace rootserver } // end namespace oceanbase diff --git a/src/rootserver/ddl_task/ob_ddl_task.h b/src/rootserver/ddl_task/ob_ddl_task.h index 8240299a6a..90bdd72614 100644 --- a/src/rootserver/ddl_task/ob_ddl_task.h +++ b/src/rootserver/ddl_task/ob_ddl_task.h @@ -182,6 +182,12 @@ public: static int to_hex_str(const ObString &src, ObSqlString &dst); + static int kill_task_inner_sql( + common::ObMySQLProxy &proxy, + const common::ObCurTraceId::TraceId &trace_id, + const uint64_t tenant_id, + const common::ObAddr &sql_exec_addr); + private: static int fill_task_record( const common::sqlclient::ObMySQLResult *result_row, @@ -189,6 +195,11 @@ private: ObDDLTaskRecord &task_record); static int64_t get_record_id(share::ObDDLType ddl_type, int64_t origin_id); + + static int kill_inner_sql( + common::ObMySQLProxy &proxy, + const uint64_t tenant_id, + const uint64_t session_id); }; class ObDDLWaitTransEndCtx @@ -263,7 +274,7 @@ public: target_object_id_(0), task_status_(share::ObDDLTaskStatus::PREPARE), snapshot_version_(0), ret_code_(OB_SUCCESS), task_id_(0), parent_task_id_(0), parent_task_key_(), task_version_(0), parallelism_(0), allocator_(lib::ObLabel("DdlTask")), compat_mode_(lib::Worker::CompatMode::INVALID), err_code_occurence_cnt_(0), - delay_schedule_time_(0), next_schedule_ts_(0), execution_id_(0) + delay_schedule_time_(0), next_schedule_ts_(0), execution_id_(0), sql_exec_addr_() {} virtual ~ObDDLTask() {} virtual int process() = 0; @@ -310,11 +321,12 @@ public: const int64_t snapshot_version, const common::ObIArray &tablet_ids); void set_sys_task_id(const TraceId &sys_task_id) { sys_task_id_ = sys_task_id; } + void set_sql_exec_addr(const common::ObAddr &addr) { sql_exec_addr_ = addr; } const TraceId &get_sys_task_id() const { return sys_task_id_; } void calc_next_schedule_ts(int ret_code); bool need_schedule() { return next_schedule_ts_ <= ObTimeUtility::current_time(); } - int push_execution_id(); bool is_replica_build_need_retry(const int ret_code); + int push_execution_id(); #ifdef ERRSIM int check_errsim_error(); #endif @@ -324,7 +336,7 @@ public: K(target_object_id_), K(task_status_), K(snapshot_version_), K_(ret_code), K_(task_id), K_(parent_task_id), K_(parent_task_key), K_(task_version), K_(parallelism), K_(ddl_stmt_str), K_(compat_mode), - K_(sys_task_id), K_(err_code_occurence_cnt), K_(next_schedule_ts), K_(delay_schedule_time), K(execution_id_)); + K_(sys_task_id), K_(err_code_occurence_cnt), K_(next_schedule_ts), K_(delay_schedule_time), K(execution_id_), K(sql_exec_addr_)); protected: int check_is_latest_execution_id(const int64_t execution_id, bool &is_latest); virtual bool is_error_need_retry(const int ret_code) @@ -359,6 +371,7 @@ protected: int64_t delay_schedule_time_; int64_t next_schedule_ts_; int64_t execution_id_; + common::ObAddr sql_exec_addr_; }; enum ColChecksumStat diff --git a/src/rootserver/ddl_task/ob_index_build_task.cpp b/src/rootserver/ddl_task/ob_index_build_task.cpp index 5255b8f349..ff830ff465 100644 --- a/src/rootserver/ddl_task/ob_index_build_task.cpp +++ b/src/rootserver/ddl_task/ob_index_build_task.cpp @@ -59,6 +59,8 @@ int ObIndexSSTableBuildTask::process() ObTabletID unused_tablet_id; const ObTableSchema *table_schema = nullptr; bool need_padding = false; + bool need_exec_new_inner_sql = true; + if (OB_FAIL(ObMultiVersionSchemaService::get_instance().get_tenant_schema_guard( tenant_id_, schema_guard))) { LOG_WARN("fail to get tenant schema guard", K(ret), K(data_table_id_)); @@ -78,8 +80,18 @@ int ObIndexSSTableBuildTask::process() } else if (nullptr == table_schema) { ret = OB_ERR_UNEXPECTED; LOG_WARN("error unexpected, table schema must not be nullptr", K(ret)); - } else if (OB_FAIL(ObDDLUtil::generate_build_replica_sql(tenant_id_, - data_table_id_, + } else { + (void)ObCheckTabletDataComplementOp::check_and_wait_old_complement_task(tenant_id_, dest_table_id_, + inner_sql_exec_addr_, + trace_id_, + table_schema->get_schema_version(), + snapshot_version_, + need_exec_new_inner_sql); + if (!need_exec_new_inner_sql) { + LOG_INFO("succ to wait and complete old task finished!", K(ret)); + } else if (OB_FAIL(root_service_->get_ddl_scheduler().on_update_execution_id(task_id_, execution_id_))) { // genenal new ObIndexSSTableBuildTask::execution_id_ and persist to inner table + LOG_WARN("failed to update execution id", K(ret)); + } else if (OB_FAIL(ObDDLUtil::generate_build_replica_sql(tenant_id_, data_table_id_, dest_table_id_, table_schema->get_schema_version(), snapshot_version_, @@ -90,39 +102,46 @@ int ObIndexSSTableBuildTask::process() !table_schema->is_user_hidden_table()/*use_schema_version_hint_for_src_table*/, nullptr, sql_string))) { - LOG_WARN("fail to generate build replica sql", K(ret)); - } else if (OB_FAIL(table_schema->is_need_padding_for_generated_column(need_padding))) { - LOG_WARN("fail to check need padding", K(ret)); - } else { - common::ObCommonSqlProxy *user_sql_proxy = nullptr; - int64_t affected_rows = 0; - ObSQLMode sql_mode = SMO_STRICT_ALL_TABLES | (need_padding ? SMO_PAD_CHAR_TO_FULL_LENGTH : 0); - ObSessionParam session_param; - session_param.sql_mode_ = (int64_t *)&sql_mode; - session_param.tz_info_wrap_ = nullptr; - session_param.ddl_info_.set_is_ddl(true); - session_param.ddl_info_.set_source_table_hidden(table_schema->is_user_hidden_table()); - session_param.ddl_info_.set_dest_table_hidden(table_schema->is_user_hidden_table()); - session_param.nls_formats_[ObNLSFormatEnum::NLS_DATE] = nls_date_format_; - session_param.nls_formats_[ObNLSFormatEnum::NLS_TIMESTAMP] = nls_timestamp_format_; - session_param.nls_formats_[ObNLSFormatEnum::NLS_TIMESTAMP_TZ] = nls_timestamp_tz_format_; - int tmp_ret = OB_SUCCESS; - if (oracle_mode) { - user_sql_proxy = GCTX.ddl_oracle_sql_proxy_; + LOG_WARN("fail to generate build replica sql", K(ret)); + } else if (OB_FAIL(table_schema->is_need_padding_for_generated_column(need_padding))) { + LOG_WARN("fail to check need padding", K(ret)); } else { - user_sql_proxy = GCTX.ddl_sql_proxy_; - } + common::ObCommonSqlProxy *user_sql_proxy = nullptr; + int64_t affected_rows = 0; + ObSQLMode sql_mode = SMO_STRICT_ALL_TABLES | (need_padding ? SMO_PAD_CHAR_TO_FULL_LENGTH : 0); + ObSessionParam session_param; + session_param.sql_mode_ = (int64_t *)&sql_mode; + session_param.tz_info_wrap_ = nullptr; + session_param.ddl_info_.set_is_ddl(true); + session_param.ddl_info_.set_source_table_hidden(table_schema->is_user_hidden_table()); + session_param.ddl_info_.set_dest_table_hidden(table_schema->is_user_hidden_table()); + session_param.nls_formats_[ObNLSFormatEnum::NLS_DATE] = nls_date_format_; + session_param.nls_formats_[ObNLSFormatEnum::NLS_TIMESTAMP] = nls_timestamp_format_; + session_param.nls_formats_[ObNLSFormatEnum::NLS_TIMESTAMP_TZ] = nls_timestamp_tz_format_; + session_param.use_external_session_ = true; // means session id dispatched by session mgr - DEBUG_SYNC(BEFORE_INDEX_SSTABLE_BUILD_TASK_SEND_SQL); - ObTimeoutCtx timeout_ctx; - LOG_INFO("execute sql" , K(sql_string), K(data_table_id_), K(tenant_id_)); - if (OB_FAIL(timeout_ctx.set_trx_timeout_us(OB_MAX_DDL_SINGLE_REPLICA_BUILD_TIMEOUT))) { - LOG_WARN("set trx timeout failed", K(ret)); - } else if (OB_FAIL(timeout_ctx.set_timeout(OB_MAX_DDL_SINGLE_REPLICA_BUILD_TIMEOUT))) { - LOG_WARN("set timeout failed", K(ret)); - } else if (OB_FAIL(user_sql_proxy->write(tenant_id_, sql_string.ptr(), affected_rows, - oracle_mode ? ObCompatibilityMode::ORACLE_MODE : ObCompatibilityMode::MYSQL_MODE, &session_param))) { - LOG_WARN("fail to execute build replica sql", K(ret), K(tenant_id_)); + common::ObAddr *sql_exec_addr = nullptr; + if (inner_sql_exec_addr_.is_valid()) { + sql_exec_addr = &inner_sql_exec_addr_; + LOG_INFO("inner sql execute addr" , K(*sql_exec_addr)); + } + int tmp_ret = OB_SUCCESS; + if (oracle_mode) { + user_sql_proxy = GCTX.ddl_oracle_sql_proxy_; + } else { + user_sql_proxy = GCTX.ddl_sql_proxy_; + } + DEBUG_SYNC(BEFORE_INDEX_SSTABLE_BUILD_TASK_SEND_SQL); + ObTimeoutCtx timeout_ctx; + LOG_INFO("execute sql" , K(sql_string), K(data_table_id_), K(tenant_id_)); + if (OB_FAIL(timeout_ctx.set_trx_timeout_us(OB_MAX_DDL_SINGLE_REPLICA_BUILD_TIMEOUT))) { + LOG_WARN("set trx timeout failed", K(ret)); + } else if (OB_FAIL(timeout_ctx.set_timeout(OB_MAX_DDL_SINGLE_REPLICA_BUILD_TIMEOUT))) { + LOG_WARN("set timeout failed", K(ret)); + } else if (OB_FAIL(user_sql_proxy->write(tenant_id_, sql_string.ptr(), affected_rows, + oracle_mode ? ObCompatibilityMode::ORACLE_MODE : ObCompatibilityMode::MYSQL_MODE, &session_param, sql_exec_addr))) { + LOG_WARN("fail to execute build replica sql", K(ret), K(tenant_id_)); + } } } @@ -153,7 +172,8 @@ ObAsyncTask *ObIndexSSTableBuildTask::deep_copy(char *buf, const int64_t buf_siz execution_id_, trace_id_, parallelism_, - root_service_); + root_service_, + inner_sql_exec_addr_); } return task; } @@ -291,6 +311,11 @@ int ObIndexBuildTask::init( if (ObDDLTaskStatus::VALIDATE_CHECKSUM == task_status) { sstable_complete_ts_ = ObTimeUtility::current_time(); } + if (OB_FAIL(ObDDLUtil::get_sys_ls_leader_addr(GCONF.cluster_id, tenant_id_, create_index_arg_.inner_sql_exec_addr_))) { + LOG_WARN("get sys ls leader addr fail", K(ret)); + ret = OB_SUCCESS; // ingore ret + } + set_sql_exec_addr(create_index_arg_.inner_sql_exec_addr_); // set to switch_status, if task cancel, we should kill session with inner_sql_exec_addr_ task_id_ = task_id; parent_task_id_ = parent_task_id; task_version_ = OB_INDEX_BUILD_TASK_VERSION; @@ -336,6 +361,7 @@ int ObIndexBuildTask::init(const ObDDLTaskRecord &task_record) ret = OB_TABLE_NOT_EXIST; LOG_WARN("fail to get table schema", K(ret)); } else { + set_sql_exec_addr(create_index_arg_.inner_sql_exec_addr_); // set to switch_status, if task cancel, we should kill session with inner_sql_exec_addr_ is_global_index_ = index_schema->is_global_index_table(); is_unique_index_ = index_schema->is_unique_index(); tenant_id_ = task_record.tenant_id_; @@ -650,7 +676,8 @@ int ObIndexBuildTask::send_build_single_replica_request() execution_id_, trace_id_, parallelism_, - root_service_); + root_service_, + create_index_arg_.inner_sql_exec_addr_); if (OB_FAIL(task.set_nls_format(create_index_arg_.nls_date_format_, create_index_arg_.nls_timestamp_format_, create_index_arg_.nls_timestamp_tz_format_))) { @@ -712,9 +739,7 @@ int ObIndexBuildTask::wait_data_complement() // submit a job to complete sstable for the index table on snapshot_version if (OB_SUCC(ret) && !state_finished && !is_sstable_complete_task_submitted_) { - if (OB_FAIL(push_execution_id())) { - LOG_WARN("failed to push execution id", K(ret)); - } else if (OB_FAIL(send_build_single_replica_request())) { + if (OB_FAIL(send_build_single_replica_request())) { LOG_WARN("fail to send build single replica request", K(ret)); } } @@ -953,8 +978,9 @@ int ObIndexBuildTask::update_complete_sstable_job_status( } else { complete_sstable_job_ret_code_ = ret_code; sstable_complete_ts_ = ObTimeUtility::current_time(); + execution_id_ = execution_id; // update ObIndexBuildTask::execution_id_ from ObIndexSSTableBuildTask::execution_id_ } - LOG_INFO("update complete sstable job return code", K(ret), K(tablet_id), K(snapshot_version), K(ret_code)); + LOG_INFO("update complete sstable job return code", K(ret), K(tablet_id), K(snapshot_version), K(ret_code), K(execution_id_)); return ret; } @@ -1240,4 +1266,3 @@ int64_t ObIndexBuildTask::get_serialize_param_size() const return create_index_arg_.get_serialize_size() + serialization::encoded_length_i64(check_unique_snapshot_) + serialization::encoded_length_i64(task_version_) + serialization::encoded_length_i64(parallelism_); } - diff --git a/src/rootserver/ddl_task/ob_index_build_task.h b/src/rootserver/ddl_task/ob_index_build_task.h index 54e44d9eb4..e3264ea7a0 100644 --- a/src/rootserver/ddl_task/ob_index_build_task.h +++ b/src/rootserver/ddl_task/ob_index_build_task.h @@ -33,11 +33,13 @@ public: const int64_t execution_id, const common::ObCurTraceId::TraceId &trace_id, const int64_t parallelism, - ObRootService *root_service) + ObRootService *root_service, + const common::ObAddr &inner_sql_exec_addr) : task_id_(task_id), tenant_id_(tenant_id), data_table_id_(data_table_id), dest_table_id_(dest_table_id), schema_version_(schema_version), snapshot_version_(snapshot_version), execution_id_(execution_id), trace_id_(trace_id), parallelism_(parallelism), allocator_("IdxSSTBuildTask"), - root_service_(root_service) + root_service_(root_service), + inner_sql_exec_addr_(inner_sql_exec_addr) { set_retry_times(0); } @@ -68,6 +70,7 @@ private: ObString nls_timestamp_format_; ObString nls_timestamp_tz_format_; ObRootService *root_service_; + common::ObAddr inner_sql_exec_addr_; DISALLOW_COPY_AND_ASSIGN(ObIndexSSTableBuildTask); }; diff --git a/src/rootserver/ddl_task/ob_table_redefinition_task.cpp b/src/rootserver/ddl_task/ob_table_redefinition_task.cpp index 9efed4e7db..7f63cc3c20 100644 --- a/src/rootserver/ddl_task/ob_table_redefinition_task.cpp +++ b/src/rootserver/ddl_task/ob_table_redefinition_task.cpp @@ -56,6 +56,10 @@ int ObTableRedefinitionTask::init(const uint64_t tenant_id, const int64_t task_i } else if (OB_FAIL(set_ddl_stmt_str(alter_table_arg_.ddl_stmt_str_))) { LOG_WARN("set ddl stmt str failed", K(ret)); } else { + if (OB_FAIL(ObDDLUtil::get_sys_ls_leader_addr(GCONF.cluster_id, tenant_id, alter_table_arg_.inner_sql_exec_addr_))) { + ret = OB_SUCCESS; // ignore ret + } + set_sql_exec_addr(alter_table_arg_.inner_sql_exec_addr_); // set to switch_status, if task cancel, we should kill session with inner_sql_exec_addr_ task_type_ = ddl_type; object_id_ = data_table_id; target_object_id_ = dest_table_id; @@ -90,6 +94,7 @@ int ObTableRedefinitionTask::init(const ObDDLTaskRecord &task_record) } else if (OB_FAIL(set_ddl_stmt_str(task_record.ddl_stmt_str_))) { LOG_WARN("set ddl stmt str failed", K(ret)); } else { + set_sql_exec_addr(alter_table_arg_.inner_sql_exec_addr_); // set to switch_status, if task cancel, we should kill session with inner_sql_exec_addr_ task_id_ = task_record.task_id_; task_type_ = task_record.ddl_type_; object_id_ = data_table_id; @@ -129,7 +134,8 @@ int ObTableRedefinitionTask::update_complete_sstable_job_status(const common::Ob LOG_INFO("receive a mismatch execution result, ignore", K(execution_id), K(execution_id_)); } else { complete_sstable_job_ret_code_ = ret_code; - LOG_INFO("table redefinition task callback", K(complete_sstable_job_ret_code_)); + execution_id_ = execution_id; // update ObTableRedefinitionTask::execution_id_ from ObDDLRedefinitionSSTableBuildTask::execution_id_ + LOG_INFO("table redefinition task callback", K(complete_sstable_job_ret_code_), K(execution_id_)); } return ret; } @@ -161,12 +167,13 @@ int ObTableRedefinitionTask::send_build_replica_request() target_object_id_, schema_version_, snapshot_version_, - execution_id_, + execution_id_, // will init in ObDDLRedefinitionSSTableBuildTask::process sql_mode, trace_id_, parallelism_, use_heap_table_ddl_plan, - GCTX.root_service_); + GCTX.root_service_, + alter_table_arg_.inner_sql_exec_addr_); if (OB_FAIL(root_service->get_ddl_service().get_tenant_schema_guard_with_version_in_inner_table(tenant_id_, schema_guard))) { LOG_WARN("get schema guard failed", K(ret)); } else if (OB_FAIL(schema_guard.get_table_schema(tenant_id_, object_id_, orig_table_schema))) { @@ -254,9 +261,7 @@ int ObTableRedefinitionTask::table_redefinition(const ObDDLTaskStatus next_task_ } if (OB_SUCC(ret) && !is_build_replica_end && 0 == build_replica_request_time_) { - if (OB_FAIL(push_execution_id())) { - LOG_WARN("failed to push execution id", K(ret)); - } else if (OB_FAIL(send_build_replica_request())) { + if (OB_FAIL(send_build_replica_request())) { LOG_WARN("fail to send build replica request", K(ret)); } else { build_replica_request_time_ = ObTimeUtility::current_time(); diff --git a/src/rootserver/ob_rs_async_rpc_proxy.h b/src/rootserver/ob_rs_async_rpc_proxy.h index d6b07adc85..24eab39fba 100644 --- a/src/rootserver/ob_rs_async_rpc_proxy.h +++ b/src/rootserver/ob_rs_async_rpc_proxy.h @@ -62,6 +62,8 @@ RPC_F(obrpc::OB_BATCH_GET_TABLET_AUTOINC_SEQ, obrpc::ObBatchGetTabletAutoincSeqA obrpc::ObBatchGetTabletAutoincSeqRes, ObBatchGetTabletAutoincSeqProxy); RPC_F(obrpc::OB_BATCH_SET_TABLET_AUTOINC_SEQ, obrpc::ObBatchSetTabletAutoincSeqArg, obrpc::ObBatchSetTabletAutoincSeqRes, ObBatchSetTabletAutoincSeqProxy); +RPC_F(obrpc::OB_DDL_CHECK_TABLET_MERGE_STATUS, obrpc::ObDDLCheckTabletMergeStatusArg, + obrpc::ObDDLCheckTabletMergeStatusResult, ObCheckTabletMergeStatusProxy); }//end namespace rootserver }//end namespace oceanbase diff --git a/src/share/ob_ddl_common.cpp b/src/share/ob_ddl_common.cpp index 91c11a2b84..d5bc23c5f5 100644 --- a/src/share/ob_ddl_common.cpp +++ b/src/share/ob_ddl_common.cpp @@ -27,6 +27,7 @@ #include "storage/tablet/ob_tablet.h" #include "storage/tx_storage/ob_ls_handle.h" #include "storage/tx_storage/ob_ls_map.h" +#include "rootserver/ob_root_service.h" using namespace oceanbase::share; using namespace oceanbase::common; @@ -707,7 +708,7 @@ int ObDDLUtil::get_tablet_leader_addr( } // Used in offline ddl to delete all checksum record in __all_ddl_checksum -// DELETE FROM __all_ddl_checksum WHERE +// DELETE FROM __all_ddl_checksum WHERE int ObDDLUtil::clear_ddl_checksum(ObPhysicalPlan *phy_plan) { int ret = OB_SUCCESS; @@ -779,3 +780,614 @@ bool ObDDLUtil::need_remote_write(const int ret_code) || OB_LS_LOCATION_LEADER_NOT_EXIST == ret_code || OB_EAGAIN == ret_code; } + +int ObDDLUtil::get_tablet_paxos_member_list( + const uint64_t tenant_id, + const common::ObTabletID &tablet_id, + common::ObIArray &paxos_server_list, + int64_t &paxos_member_count) +{ + int ret = OB_SUCCESS; + ObLSLocation location; + paxos_member_count = 0; + if (OB_INVALID_TENANT_ID == tenant_id || !tablet_id.is_valid()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("fail to get tablet replica location, invalid id", K(ret), K(tenant_id), K(tablet_id)); + } else if (OB_FAIL(get_tablet_replica_location(tenant_id, tablet_id, location))) { + LOG_WARN("fail to get tablet replica location", K(tenant_id), K(tablet_id), K(ret)); + } else { + const ObIArray &ls_locations = location.get_replica_locations(); + for (int64_t i = 0; i < ls_locations.count() && OB_SUCC(ret); ++i) { + common::ObReplicaType replica_type = ls_locations.at(i).get_replica_type(); + if (REPLICA_TYPE_FULL != replica_type && REPLICA_TYPE_LOGONLY != replica_type) { + continue; + } + paxos_member_count++; + if (REPLICA_TYPE_FULL == replica_type) { // paxos replica + const ObAddr &server = ls_locations.at(i).get_server(); + if (!has_exist_in_array(paxos_server_list, server) && OB_FAIL(paxos_server_list.push_back(server))) { + LOG_WARN("fail to push back addr", K(ret), K(server)); + } + } + } + } + return ret; +} + +int ObDDLUtil::get_tablet_replica_location( + const uint64_t tenant_id, + const common::ObTabletID &tablet_id, + ObLSLocation &location) +{ + int ret = OB_SUCCESS; + const int64_t cluster_id = GCONF.cluster_id; + share::ObLSID ls_id; + int64_t expire_renew_time = INT64_MAX; + bool is_cache_hit = false; + if (OB_UNLIKELY(nullptr == GCTX.location_service_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("location service ptr is null", K(ret)); + } else if (tenant_id == OB_INVALID_TENANT_ID || !tablet_id.is_valid()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("fail to get tablet replica location, invalid id", K(ret), K(tenant_id), K(tablet_id)); + } else if (OB_FAIL(GCTX.location_service_->get(tenant_id, + tablet_id, + INT64_MAX, + is_cache_hit, + ls_id))) { + LOG_WARN("fail to get ls id according to tablet_id", K(ret), K(tenant_id), K(tablet_id)); + } else if (OB_FAIL(GCTX.location_service_->get(cluster_id, + tenant_id, + ls_id, + expire_renew_time, + is_cache_hit, + location))) { + LOG_WARN("fail to get ls location", K(ret), K(cluster_id), K(tenant_id), K(ls_id), K(tablet_id)); + } + return ret; +} + +int ObDDLUtil::get_sys_ls_leader_addr( + const uint64_t cluster_id, + const uint64_t tenant_id, + common::ObAddr &leader_addr) +{ + int ret = OB_SUCCESS; + bool force_renew = false; + share::ObLocationService *location_service = nullptr; + share::ObLSID ls_id(share::ObLSID::SYS_LS_ID); + + if (OB_ISNULL(location_service = GCTX.location_service_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("fail to check and wait old completement, null pointer. ", K(ret)); + } else if (OB_FAIL(location_service->get_leader(cluster_id, + tenant_id, + ls_id, + force_renew, + leader_addr))) { + LOG_WARN("failed to get ls_leader", K(ret)); + } else if (OB_UNLIKELY(!leader_addr.is_valid())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("leader addr is invalid", K(ret), K(tenant_id), K(leader_addr), K(cluster_id)); + } else { + LOG_INFO("succ to get ls leader addr", K(cluster_id), K(tenant_id), K(leader_addr)); + } + return ret; +} + +/****************** ObCheckTabletDataComplementOp *************/ + +int ObCheckTabletDataComplementOp::check_task_inner_sql_session_status( + const common::ObAddr &inner_sql_exec_addr, + const common::ObCurTraceId::TraceId &trace_id, + const uint64_t tenant_id, + const int64_t schema_version, + const int64_t scn, + bool &is_old_task_session_exist) +{ + int ret = OB_SUCCESS; + is_old_task_session_exist = false; + char ip_str[common::OB_IP_STR_BUFF]; + rootserver::ObRootService *root_service = nullptr; + + if (OB_ISNULL(root_service = GCTX.root_service_)) { + ret = OB_ERR_SYS; + LOG_WARN("fail to get sql proxy, root service is null.!"); + } else if (OB_UNLIKELY(OB_INVALID_ID == tenant_id || trace_id.is_invalid() || !inner_sql_exec_addr.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", K(ret), K(tenant_id), K(trace_id), K(inner_sql_exec_addr)); + } else if (!inner_sql_exec_addr.ip_to_string(ip_str, sizeof(ip_str))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("ip to string failed", K(ret), K(inner_sql_exec_addr)); + } else { + ret = OB_SUCCESS; + common::ObMySQLProxy &proxy = root_service->get_sql_proxy(); + ObSqlString sql_string; + SMART_VAR(ObMySQLProxy::MySQLResult, res) { + sqlclient::ObMySQLResult *result = NULL; + char trace_id_str[64] = { 0 }; + char charater = '%'; + if (OB_UNLIKELY(0 > trace_id.to_string(trace_id_str, sizeof(trace_id_str)))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get trace id string failed", K(ret), K(trace_id)); + } else if (OB_FAIL(sql_string.assign_fmt(" SELECT id as session_id FROM %s WHERE trace_id = \"%s\" " + " and svr_ip = \"%s\" and svr_port = %d and info like \"%cINSERT%cINTO%cSELECT%c%ld%c%ld%c\" ", + OB_ALL_VIRTUAL_SESSION_INFO_TNAME, + trace_id_str, + ip_str, + inner_sql_exec_addr.get_port(), + charater, + charater, + charater, + charater, + schema_version, + charater, + scn, + charater ))) { + LOG_WARN("assign sql string failed", K(ret)); + } else if (OB_FAIL(proxy.read(res, OB_SYS_TENANT_ID, sql_string.ptr(), &inner_sql_exec_addr))) { + LOG_WARN("query ddl task record failed", K(ret), K(sql_string)); + } else if (OB_ISNULL((result = res.get_result()))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("fail to get sql result", K(ret), KP(result)); + } else { + uint64_t session_id = 0; + while (OB_SUCC(ret)) { + if (OB_FAIL(result->next())) { + if (OB_ITER_END == ret) { + LOG_INFO("success to get result, and no inner sql task", K(ret), K(sql_string.ptr()), + K(ip_str), K(trace_id_str), K(tenant_id), K(sql_string)); + ret = OB_SUCCESS; + break; + } else { + LOG_WARN("fail to get next row", K(ret)); + } + } else { + is_old_task_session_exist = true; + EXTRACT_UINT_FIELD_MYSQL(*result, "session_id", session_id, uint64_t); + LOG_INFO("succ to match inner sql session in trace id", K(ret), K(sql_string.ptr()), + K(session_id), K(tenant_id), K(ip_str), K(trace_id_str), K(sql_string)); + } + } + } + } + } + return ret; +} + +int ObCheckTabletDataComplementOp::update_replica_merge_status( + const ObTabletID &tablet_id, + const int merge_status, + hash::ObHashMap &tablets_commited_map) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!tablet_id.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("update replica merge status fail.", K(ret)); + } else { + int32_t commited_count = 0; + if (OB_SUCC(tablets_commited_map.get_refactored(tablet_id, commited_count))) { + // overwrite + if (true == merge_status) { + commited_count++; + if (OB_FAIL(tablets_commited_map.set_refactored(tablet_id, commited_count, true /* overwrite */))) { + LOG_WARN("fail to insert map status", K(ret)); + } + } + } else if (OB_HASH_NOT_EXIST == ret) { // new insert + ret = OB_SUCCESS; + if (true == merge_status) { + commited_count = 1; + if (OB_FAIL(tablets_commited_map.set_refactored(tablet_id, commited_count, true /* overwrite */))) { + LOG_WARN("fail to insert map status", K(ret)); + } + } + } else { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("failed to update replica merge status", K(ret)); + } + LOG_INFO("success to update replica merge status.", K(tablet_id), K(merge_status)); + } + return ret; +} + + +// only get un-merge tablet replica ip addr +int ObCheckTabletDataComplementOp::construct_tablet_ip_map( + const uint64_t tenant_id, + const ObTabletID &tablet_id, + hash::ObHashMap> &ip_tablets_map) +{ + int ret = OB_SUCCESS; + common::ObArray paxos_server_list; + common::ObArray unfinished_replica_addrs; + common::ObArray tablet_array; + int64_t paxos_member_count; // unused, but need + + if (!tablet_id.is_valid() || OB_INVALID_TENANT_ID == tenant_id) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", K(ret), K(tablet_id), K(tenant_id)); + } else if (OB_FAIL(ObDDLUtil::get_tablet_paxos_member_list(tenant_id, tablet_id, paxos_server_list, paxos_member_count))) { + LOG_WARN("fail to get tablet replica location!", K(ret), K(tablet_id)); + } else { + // classify un-merge tablet addr and tablet ids + for (int64_t i = 0; OB_SUCC(ret) && i < paxos_server_list.count(); ++i) { + tablet_array.reset(); + const ObAddr & addr = paxos_server_list.at(i); + if (OB_FAIL(ip_tablets_map.get_refactored(addr, tablet_array))) { + if (OB_HASH_NOT_EXIST == ret) { // first time + ret = OB_SUCCESS; + if (OB_FAIL(tablet_array.push_back(tablet_id))) { + LOG_WARN("fail to push back to array", K(ret), K(tablet_id)); + } else if (OB_FAIL(ip_tablets_map.set_refactored(addr, tablet_array, true/* overwrite */))) { + LOG_WARN("set ip tablet map fail.", K(ret), K(tablet_id), K(addr)); + } + } else { + LOG_WARN("get ip tablet from map fail.", K(ret), K(tablet_id), K(addr)); + } + } else if (OB_FAIL(tablet_array.push_back(tablet_id))) { + LOG_WARN("fail to push back to array", K(ret), K(tablet_id)); + } else if (OB_FAIL(ip_tablets_map.set_refactored(addr, tablet_array, true/* overwrite */))) { + LOG_WARN("set ip tablet map fail.", K(ret), K(tablet_id), K(addr)); + } + } + } + + return ret; +} + +int ObCheckTabletDataComplementOp::construct_ls_tablet_map( + const uint64_t tenant_id, + const common::ObTabletID &tablet_id, + hash::ObHashMap> &ls_tablets_map) +{ + int ret = OB_SUCCESS; + bool is_cache_hit = false; + share::ObLSID ls_id; + common::ObArray tablet_array; + + if (!tablet_id.is_valid() || OB_INVALID_TENANT_ID == tenant_id) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", K(ret), K(tablet_id), K(tenant_id)); + } else if (OB_FAIL(GCTX.location_service_->get(tenant_id, + tablet_id, + INT64_MAX, + is_cache_hit, /*is_cache_hit*/ + ls_id))) { + LOG_WARN("fail to get ls id according to tablet_id", K(ret), K(tenant_id), K(tablet_id)); + } else if (OB_FAIL(ls_tablets_map.get_refactored(ls_id, tablet_array))) { + if (OB_HASH_NOT_EXIST == ret) { // first time + ret = OB_SUCCESS; + if (OB_FAIL(tablet_array.push_back(tablet_id))) { + LOG_WARN("fail to push back to array", K(ret), K(tablet_id)); + } else if (OB_FAIL(ls_tablets_map.set_refactored(ls_id, tablet_array, false))) { + LOG_WARN("ls_tablets_map set fail", K(ret), K(tablet_id), K(ls_id)); + } + } else { + LOG_WARN("ls_tablets_map get fail", K(ret), K(tablet_id), K(ls_id)); + } + } else if (OB_FAIL(tablet_array.push_back(tablet_id))) { + LOG_WARN("fail to push back to array", K(ret), K(tablet_id)); + } else if (OB_FAIL(ls_tablets_map.set_refactored(ls_id, tablet_array, true /* overwrite */))) { + LOG_WARN("ls_tablets_map set fail", K(ret), K(tablet_id), K(ls_id)); + } + + return ret; +} + +int ObCheckTabletDataComplementOp::calculate_build_finish( + const uint64_t tenant_id, + const common::ObIArray &tablet_ids, + hash::ObHashMap &tablets_commited_map, + int64_t &build_succ_count) +{ + int ret = OB_SUCCESS; + common::ObArray paxos_server_list; // unused + int64_t paxos_member_count = 0; + + build_succ_count = 0; + + if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("fail to check tablets commit status", K(ret), K(tenant_id)); + } else if (tablets_commited_map.size() <= 0) { + // do nothing + } else { + int commited_count = 0; + for (int64_t tablet_idx = 0; OB_SUCC(ret) && tablet_idx < tablet_ids.count(); ++tablet_idx) { + common::ObTabletID tablet_id = tablet_ids.at(tablet_idx); + if (OB_FAIL(ObDDLUtil::get_tablet_paxos_member_list(tenant_id, + tablet_id, + paxos_server_list, + paxos_member_count))) { + LOG_WARN("fail to get tablet paxos member list.", + K(ret), K(tenant_id), K(tablet_id), K(paxos_server_list), K(paxos_member_count)); + } else if (paxos_member_count == 0) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("fail to check task tablet, unexpected!", + K(ret), K(paxos_member_count), K(tablet_id), K(tenant_id)); + } else if (OB_FAIL(tablets_commited_map.get_refactored(tablet_id, commited_count))){ + LOG_WARN("fail to get tablet commited map, unexpected!", K(ret), K(tablet_id)); + } else if (commited_count < ((paxos_member_count >> 1) + 1)) { // not finished majority + // do nothing + } else { + build_succ_count++; + } + + } + LOG_INFO("succ check and commit count", K(build_succ_count)); + } + return ret; +} + + +int ObCheckTabletDataComplementOp::do_check_tablets_merge_status( + const uint64_t tenant_id, + const int64_t snapshot_version, + const ObIArray &tablet_ids, + const ObLSID &ls_id, + hash::ObHashMap> &ip_tablets_map, + hash::ObHashMap &tablets_commited_map, + int64_t &tablet_build_succ_count) +{ + int ret = OB_SUCCESS; + obrpc::ObSrvRpcProxy *rpc_proxy = GCTX.srv_rpc_proxy_; + ip_tablets_map.reuse(); + tablets_commited_map.reuse(); + + tablet_build_succ_count = 0; + + if (OB_UNLIKELY(tablet_ids.count() < 0 || OB_INVALID_TENANT_ID == tenant_id || OB_INVALID_TIMESTAMP == snapshot_version) || + OB_ISNULL(rpc_proxy)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", K(ret), K(tablet_ids.count()), K(tenant_id), K(snapshot_version), K(rpc_proxy)); + } else { + rootserver::ObCheckTabletMergeStatusProxy proxy(*rpc_proxy, + &obrpc::ObSrvRpcProxy::check_ddl_tablet_merge_status); + obrpc::ObDDLCheckTabletMergeStatusArg arg; + const int64_t rpc_timeout = max(GCONF.rpc_timeout, 1000L * 1000L * 9L); + + for (int64_t i = 0; OB_SUCC(ret) && i < tablet_ids.count(); ++i) { + const ObTabletID &tablet_id = tablet_ids.at(i); + if (OB_FAIL(construct_tablet_ip_map(tenant_id, tablet_id, ip_tablets_map))) { + LOG_WARN("fail to get tablet ip addr", K(ret), K(tablet_id)); + } + } + // handle every addr tablet + for (hash::ObHashMap>::const_iterator ip_iter = ip_tablets_map.begin(); + ip_iter != ip_tablets_map.end() && OB_SUCC(ret); ++ip_iter) { + const ObAddr & dest_ip = ip_iter->first; + const ObArray &tablet_array = ip_iter->second; + if (OB_FAIL(arg.tablet_ids_.assign(tablet_array))) { + LOG_WARN("fail to get tablet ip addr", K(ret), K(tablet_array)); + } else { + arg.tenant_id_ = tenant_id; + arg.ls_id_ = ls_id; + arg.snapshot_version_ = snapshot_version; + if (OB_FAIL(proxy.call(dest_ip, rpc_timeout, tenant_id, arg))) { + LOG_WARN("send rpc failed", K(ret), K(arg), K(dest_ip), K(tenant_id)); + } + } + } + // handle batch result + int tmp_ret = OB_SUCCESS; + common::ObArray return_ret_array; + if (OB_SUCCESS != (tmp_ret = proxy.wait_all(return_ret_array))) { + LOG_WARN("rpc proxy wait failed", K(tmp_ret)); + ret = OB_SUCCESS == ret ? tmp_ret : ret; + } else if (return_ret_array.count() != ip_tablets_map.size()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("rpc proxy rsp size not equal to send size", K(ret), + K(return_ret_array.count()), K(ip_tablets_map.size())); + } else { + const ObIArray &result_array = proxy.get_results(); + // 1. handle every ip addr result + for (int64_t i = 0; OB_SUCC(ret) && i < result_array.count(); i++) { + int return_ret = return_ret_array.at(i); // check return ret code + if (OB_SUCCESS == return_ret) { + const obrpc::ObDDLCheckTabletMergeStatusResult *cur_result = nullptr; // ip tablets status result + common::ObSArray tablet_rsp_array; + common::ObArray tablet_req_array; + const common::ObAddr &tablet_addr = proxy.get_dests().at(i); // get rpc dest addr + + if (OB_ISNULL(cur_result = result_array.at(i))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("merge status result is null.", K(ret), K(cur_result)); + } else if (FALSE_IT(tablet_rsp_array = cur_result->merge_status_)) { + } else if (OB_FAIL(ip_tablets_map.get_refactored(tablet_addr, tablet_req_array))) { + LOG_WARN("get from ip tablet map fail.", K(ret), K(tablet_addr)); + } else if (tablet_req_array.count() != tablet_rsp_array.count()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("tablet req count is not equal to tablet rsp count", K(ret), K(tablet_req_array), K(tablet_rsp_array)); + } else { + // 2. handle every tablet status + for (int64_t idx = 0; OB_SUCC(ret) && idx < tablet_rsp_array.count(); ++idx) { + const common::ObTabletID &tablet_id = tablet_req_array.at(idx); // tablet id + const bool tablet_status = tablet_rsp_array.at(idx); + if (OB_FAIL(update_replica_merge_status(tablet_id, tablet_status, tablets_commited_map))) { // update tablet merge status from get + LOG_WARN("fail to update replica merge status", K(ret), K(tablet_id), K(tablet_addr)); + } else { + LOG_INFO("succ to update replica merge status", K(tablet_addr), K(tablet_id), K(tablet_status)); + } + } + } + } else { + LOG_WARN("rpc proxy return fail.", K(return_ret)); + } + } + // 3. check any commit tablet + if (OB_SUCC(ret)) { + int64_t build_succ_count = 0; + if (OB_FAIL(calculate_build_finish(tenant_id, tablet_ids, tablets_commited_map, build_succ_count))) { + LOG_WARN("check and commit tbalets commit log fail.", K(ret), K(tablet_ids), K(build_succ_count)); + } else { + DEBUG_SYNC(DDL_CHECK_TABLET_MERGE_STATUS); + tablet_build_succ_count += build_succ_count; + } + } + } + } + return ret; +} + + +int ObCheckTabletDataComplementOp::check_tablet_merge_status( + const uint64_t tenant_id, + const ObIArray &tablet_ids, + const int64_t snapshot_version, + bool &is_all_tablets_commited) +{ + int ret = OB_SUCCESS; + is_all_tablets_commited = false; + + hash::ObHashMap> ip_tablets_map; // use for classify tablet replica addr + hash::ObHashMap> ls_tablets_map; // use for classify tablet ls + hash::ObHashMap tablets_commited_map; + + const static int64_t max_map_hash_bucket = tablet_ids.count(); + + if (OB_UNLIKELY( tablet_ids.count() <= 0 || OB_INVALID_ID == tenant_id || OB_INVALID_TIMESTAMP == snapshot_version)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", K(ret), K(tablet_ids.count()), K(tenant_id), K(snapshot_version)); + } else if (OB_FAIL(ip_tablets_map.create(max_map_hash_bucket, "DdlTablet"))) { + LOG_WARN("fail to create ip_tablets_map", K(ret)); + } else if (OB_FAIL(ls_tablets_map.create(max_map_hash_bucket, "DdlTablet"))) { + LOG_WARN("fail to create ls_tablets_map", K(ret)); + } else if (OB_FAIL(tablets_commited_map.create(max_map_hash_bucket, "DdlTablet"))){ + LOG_WARN("fail to create tablets_commited_map", K(ret)); + } else { + const static int64_t batch_size = 100; // batch tablet number + int64_t total_build_succ_count = 0; + int64_t one_batch_build_succ_count = 0; + for (int64_t i = 0; OB_SUCC(ret) && i < tablet_ids.count(); ++i) { + const ObTabletID &tablet_id = tablet_ids.at(i); + if (construct_ls_tablet_map(tenant_id, tablet_id, ls_tablets_map)) { + LOG_WARN("construct_tablet_ls_map fail", K(ret), K(tenant_id), K(tablet_id)); + } else { + if ((i != 0 && i % batch_size == 0) /* reach batch size */ || i == tablet_ids.count() - 1 /* reach end */) { + for (hash::ObHashMap>::const_iterator ls_iter = ls_tablets_map.begin(); + ls_iter != ls_tablets_map.end() && OB_SUCC(ret); ++ls_iter) { + const ObLSID &ls_id = ls_iter->first; + const ObArray &tablet_array = ls_iter->second; + if (OB_FAIL(do_check_tablets_merge_status(tenant_id, + snapshot_version, + tablet_array, + ls_id, + ip_tablets_map, + tablets_commited_map, + one_batch_build_succ_count))) { + LOG_WARN("do check tablets merge status fail", K(ret)); + } else { + total_build_succ_count += one_batch_build_succ_count; + } + } + ls_tablets_map.reuse(); // reuse map + } + } + } + if (total_build_succ_count == tablet_ids.count()) { + is_all_tablets_commited = true; + } else { + int64_t total_tablets_count = tablet_ids.count(); + LOG_WARN("not all tablets finished create sstables", K(ret), K(total_build_succ_count), K(total_tablets_count)); + } + } + + ip_tablets_map.destroy(); + ls_tablets_map.destroy(); + tablets_commited_map.destroy(); + + return ret; +} + +/* + * 1. get a batch of tablets and construct a tmp ls_tablet_map + * 2. get tablet_ip_map and send async batch rpc and get results + * 3. push every tablet result to tablet_result_array + * 4. check result and find finished tablets + */ +int ObCheckTabletDataComplementOp::check_all_tablet_sstable_status( + const uint64_t tenant_id, + const uint64_t index_table_id, + const int64_t snapshot_version, + bool &is_all_sstable_build_finished) +{ + int ret = OB_SUCCESS; + ObArray dest_tablet_ids; + + is_all_sstable_build_finished = false; + + if (OB_UNLIKELY(OB_INVALID_ID == tenant_id || OB_INVALID_ID == index_table_id || OB_INVALID_TIMESTAMP == snapshot_version)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("fail to check and wait complement task", K(ret), K(tenant_id), K(index_table_id), K(snapshot_version)); + } else if (OB_FAIL(ObDDLUtil::get_tablets(tenant_id, index_table_id, dest_tablet_ids))) { + LOG_WARN("fail to get tablets", K(ret), K(tenant_id), K(index_table_id)); + } else if (OB_FAIL(check_tablet_merge_status(tenant_id, dest_tablet_ids, snapshot_version, is_all_sstable_build_finished))){ + LOG_WARN("fail to check tablet merge status.", K(ret), K(tenant_id), K(dest_tablet_ids), K(snapshot_version)); + } + return ret; +} + +/* + * This func is used to check duplicate data completement inner sql + * if has running inner sql, we should wait until finished. But + * if not has running inner sql, we should found if all tablet sstable + * has builded already. If not all builded and no inner sql running, or + * error case happen, we still execute new inner sql outside. + */ +int ObCheckTabletDataComplementOp::check_and_wait_old_complement_task( + const uint64_t tenant_id, + const uint64_t table_id, + const common::ObAddr &inner_sql_exec_addr, + const common::ObCurTraceId::TraceId &trace_id, + const int64_t schema_version, + const int64_t scn, + bool &need_exec_new_inner_sql) +{ + int ret = OB_SUCCESS; + need_exec_new_inner_sql = true; // default need execute new inner sql + bool is_old_task_session_exist = false; + bool is_all_sstable_build_finished = false; + bool need_wait = false; + + if (OB_UNLIKELY(OB_INVALID_ID == tenant_id || OB_INVALID_ID == table_id)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("fail to check and wait complement task", K(ret), K(tenant_id), K(table_id)); + } else { + LOG_INFO("start to check and wait complement task", K(tenant_id), K(table_id), K(inner_sql_exec_addr), K(trace_id)); + do { + if (OB_FAIL(check_all_tablet_sstable_status(tenant_id, + table_id, + scn, + is_all_sstable_build_finished))) { + LOG_WARN("fail to check task tablet sstable status", K(ret), K(tenant_id), K(table_id), K(scn)); + } else if (is_all_sstable_build_finished) { + LOG_INFO("all tablet sstable has build finished"); + } else { + if (OB_FAIL(check_task_inner_sql_session_status(inner_sql_exec_addr, + trace_id, + tenant_id, + schema_version, + scn, + is_old_task_session_exist))) { + LOG_WARN("fail check task inner sql session status", K(ret), K(trace_id), K(inner_sql_exec_addr)); + } else if (!is_old_task_session_exist) { + LOG_WARN("old inner sql session is not exist.", K(ret)); + } else { + usleep(10 * 1000); // sleep 10ms + } + } + need_wait = !is_all_sstable_build_finished && is_old_task_session_exist; + } while (OB_SUCC(ret) && need_wait); // TODO: time out + ///// end + /* Only in table all sstables not finished case, we will do retry */ + if (is_all_sstable_build_finished) { + need_exec_new_inner_sql = false; + LOG_INFO("no need to execute inner sql to do complement.", K(need_exec_new_inner_sql)); + } + } + LOG_INFO("end to check and wait complement task", K(ret), + K(table_id), K(is_old_task_session_exist), K(is_all_sstable_build_finished), K(need_exec_new_inner_sql)); + + return ret; +} diff --git a/src/share/ob_ddl_common.h b/src/share/ob_ddl_common.h index c72d81c44a..2c0969ffb8 100644 --- a/src/share/ob_ddl_common.h +++ b/src/share/ob_ddl_common.h @@ -16,6 +16,7 @@ #include "lib/allocator/page_arena.h" #include "share/schema/ob_table_schema.h" #include "share/schema/ob_schema_service.h" +#include "share/location_cache/ob_location_struct.h" #include "storage/tablet/ob_tablet_common.h" namespace oceanbase @@ -282,6 +283,22 @@ public: && CS_TYPE_BINARY != obj_meta.get_collation_type(); } + static int get_sys_ls_leader_addr( + const uint64_t cluster_id, + const uint64_t tenant_id, + common::ObAddr &leader_addr); + + static int get_tablet_paxos_member_list( + const uint64_t tenant_id, + const common::ObTabletID &tablet_id, + common::ObIArray &paxos_server_list, + int64_t &paxos_member_count); + + static int get_tablet_replica_location( + const uint64_t tenant_id, + const common::ObTabletID &tablet_id, + ObLSLocation &location); + private: static int generate_column_name_str( const common::ObIArray &column_names, @@ -307,6 +324,76 @@ private: }; +class ObCheckTabletDataComplementOp +{ +public: + + static int check_and_wait_old_complement_task( + const uint64_t tenant_id, + const uint64_t index_table_id, + const common::ObAddr &inner_sql_exec_addr, + const common::ObCurTraceId::TraceId &trace_id, + const int64_t schema_version, + const int64_t scn, + bool &need_exec_new_inner_sql); + +private: + + static int check_all_tablet_sstable_status( + const uint64_t tenant_id, + const uint64_t index_table_id, + const int64_t snapshot_version, + bool &is_all_sstable_build_finished); + + static int check_task_inner_sql_session_status( + const common::ObAddr &inner_sql_exec_addr, + const common::ObCurTraceId::TraceId &trace_id, + const uint64_t tenant_id, + const int64_t schema_version, + const int64_t scn, + bool &is_old_task_session_exist); + + static int do_check_tablets_merge_status( + const uint64_t tenant_id, + const int64_t snapshot_version, + const ObIArray &tablet_ids, + const ObLSID &ls_id, + hash::ObHashMap> &ip_tablets_map, + hash::ObHashMap &tablets_commited_map, + int64_t &tablet_commit_count); + + static int check_tablet_merge_status( + const uint64_t tenant_id, + const ObIArray &tablet_ids, + const int64_t snapshot_version, + bool &is_all_tablets_commited); + + static int update_replica_merge_status( + const ObTabletID &tablet_id, + const int merge_status, + hash::ObHashMap &tablets_commited_map); + + + static int calculate_build_finish( + const uint64_t tenant_id, + const common::ObIArray &tablet_ids, + hash::ObHashMap &tablets_commited_map, + int64_t &commit_succ_count); + + static int construct_ls_tablet_map( + const uint64_t tenant_id, + const common::ObTabletID &tablet_id, + hash::ObHashMap> &ls_tablets_map); + + static int construct_tablet_ip_map( + const uint64_t tenant_id, + const ObTabletID &tablet_id, + hash::ObHashMap> &ip_tablets_map); + +}; + + + } // end namespace share } // end namespace oceanbase diff --git a/src/share/ob_debug_sync_point.h b/src/share/ob_debug_sync_point.h index 9a2db64eed..bad843a456 100644 --- a/src/share/ob_debug_sync_point.h +++ b/src/share/ob_debug_sync_point.h @@ -425,6 +425,7 @@ class ObString; ACT(BEFORE_TABLE_REDEFINITION_TASK_EFFECT,)\ ACT(ALTER_LS_CHOOSE_SRC,)\ ACT(BEFORE_LOCK_SERVICE_UNLOCK,)\ + ACT(DDL_CHECK_TABLET_MERGE_STATUS,)\ ACT(MAX_DEBUG_SYNC_POINT,) DECLARE_ENUM(ObDebugSyncPoint, debug_sync_point, OB_DEBUG_SYNC_POINT_DEF); diff --git a/src/share/ob_rpc_struct.cpp b/src/share/ob_rpc_struct.cpp index b405bd0871..1e26cbef82 100644 --- a/src/share/ob_rpc_struct.cpp +++ b/src/share/ob_rpc_struct.cpp @@ -1629,7 +1629,8 @@ OB_DEF_SERIALIZE(ObAlterTableArg) skip_sys_table_check_, need_rebuild_trigger_, foreign_key_checks_, - is_add_to_scheduler_); + is_add_to_scheduler_, + inner_sql_exec_addr_); return ret; } @@ -1717,7 +1718,8 @@ OB_DEF_DESERIALIZE(ObAlterTableArg) skip_sys_table_check_, need_rebuild_trigger_, foreign_key_checks_, - is_add_to_scheduler_); + is_add_to_scheduler_, + inner_sql_exec_addr_); return ret; } @@ -1758,7 +1760,8 @@ OB_DEF_SERIALIZE_SIZE(ObAlterTableArg) skip_sys_table_check_, need_rebuild_trigger_, foreign_key_checks_, - is_add_to_scheduler_); + is_add_to_scheduler_, + inner_sql_exec_addr_); } if (OB_FAIL(ret)) { @@ -2120,7 +2123,8 @@ DEF_TO_STRING(ObCreateIndexArg) K_(nls_date_format), K_(nls_timestamp_format), K_(nls_timestamp_tz_format), - K_(sql_mode)); + K_(sql_mode), + K_(inner_sql_exec_addr)); J_OBJ_END(); return pos; } @@ -2142,7 +2146,8 @@ OB_SERIALIZE_MEMBER((ObCreateIndexArg, ObIndexArg), nls_date_format_, nls_timestamp_format_, nls_timestamp_tz_format_, - sql_mode_); + sql_mode_, + inner_sql_exec_addr_); bool ObAlterIndexArg::is_valid() const { @@ -3120,6 +3125,20 @@ bool ObCheckModifyTimeElapsedArg::is_valid() const return bret; } +int ObDDLCheckTabletMergeStatusArg::assign(const ObDDLCheckTabletMergeStatusArg &other) { + int ret = OB_SUCCESS; + if (OB_FAIL(tablet_ids_.assign(other.tablet_ids_))) { + LOG_WARN("assign tablet_ids_ failed", K(ret), K(other.tablet_ids_)); + } else { + tenant_id_ = other.tenant_id_; + ls_id_ = other.ls_id_; + snapshot_version_ = other.snapshot_version_; + } + return ret; +} + +OB_SERIALIZE_MEMBER(ObDDLCheckTabletMergeStatusArg, tenant_id_, ls_id_, tablet_ids_, snapshot_version_); + void ObCheckModifyTimeElapsedArg::reuse() { tenant_id_ = OB_INVALID_ID; @@ -3144,6 +3163,8 @@ OB_SERIALIZE_MEMBER(ObCheckSchemaVersionElapsedResult, results_); OB_SERIALIZE_MEMBER(CandidateStatus, candidate_status_); +OB_SERIALIZE_MEMBER(ObDDLCheckTabletMergeStatusResult, merge_status_); + //----Structs for managing privileges---- OB_SERIALIZE_MEMBER(ObAccountArg, user_name_, diff --git a/src/share/ob_rpc_struct.h b/src/share/ob_rpc_struct.h index 9f0adc4af7..04783817d6 100644 --- a/src/share/ob_rpc_struct.h +++ b/src/share/ob_rpc_struct.h @@ -1612,7 +1612,8 @@ public: skip_sys_table_check_(false), need_rebuild_trigger_(false), foreign_key_checks_(true), - is_add_to_scheduler_(false) + is_add_to_scheduler_(false), + inner_sql_exec_addr_() { } virtual ~ObAlterTableArg() @@ -1676,7 +1677,8 @@ public: K_(foreign_key_checks), K_(is_add_to_scheduler), K_(table_id), - K_(hidden_table_id)); + K_(hidden_table_id), + K_(inner_sql_exec_addr)); private: int alloc_index_arg(const ObIndexArg::IndexActionType index_action_type, ObIndexArg *&index_arg); public: @@ -1707,6 +1709,7 @@ public: bool need_rebuild_trigger_; bool foreign_key_checks_; bool is_add_to_scheduler_; + common::ObAddr inner_sql_exec_addr_; int serialize_index_args(char *buf, const int64_t data_len, int64_t &pos) const; int deserialize_index_args(const char *buf, const int64_t data_len, int64_t &pos); int64_t get_index_args_serialize_size() const; @@ -1949,7 +1952,8 @@ public: nls_date_format_(), nls_timestamp_format_(), nls_timestamp_tz_format_(), - sql_mode_(0) + sql_mode_(0), + inner_sql_exec_addr_() { index_action_type_ = ADD_INDEX; index_using_type_ = share::schema::USING_BTREE; @@ -1976,6 +1980,7 @@ public: nls_timestamp_format_.reset(); nls_timestamp_tz_format_.reset(); sql_mode_ = 0; + inner_sql_exec_addr_.reset(); } void set_index_action_type(const IndexActionType type) { index_action_type_ = type; } bool is_valid() const; @@ -2006,6 +2011,7 @@ public: nls_timestamp_format_ = other.nls_timestamp_format_; nls_timestamp_tz_format_ = other.nls_timestamp_tz_format_; sql_mode_ = other.sql_mode_; + inner_sql_exec_addr_ = other.inner_sql_exec_addr_; } return ret; } @@ -2036,6 +2042,7 @@ public: common::ObString nls_timestamp_format_; common::ObString nls_timestamp_tz_format_; ObSQLMode sql_mode_; + common::ObAddr inner_sql_exec_addr_; }; typedef ObCreateIndexArg ObAlterPrimaryArg; @@ -3640,6 +3647,33 @@ public: ObSEArray tablets_; }; +struct ObDDLCheckTabletMergeStatusArg final +{ + OB_UNIS_VERSION(1); +public: + ObDDLCheckTabletMergeStatusArg() + : tenant_id_(), ls_id_(), tablet_ids_(), snapshot_version_() + {} + ~ObDDLCheckTabletMergeStatusArg() = default; + bool is_valid() const { + return OB_INVALID_TENANT_ID != tenant_id_ && + common::OB_INVALID_TIMESTAMP != snapshot_version_ && + ls_id_.is_valid() && + tablet_ids_.count() > 0; + } + int assign(const ObDDLCheckTabletMergeStatusArg &other); + void reset() { + ls_id_.reset(); + tablet_ids_.reset(); + } +public: + TO_STRING_KV(K_(tenant_id), K_(ls_id), K_(tablet_ids), K_(snapshot_version)); + uint64_t tenant_id_; + share::ObLSID ls_id_; + common::ObSArray tablet_ids_; + int64_t snapshot_version_; +}; + struct ObCheckModifyTimeElapsedArg final { OB_UNIS_VERSION(1); @@ -3664,6 +3698,18 @@ public: transaction::ObTransID pending_tx_id_; }; +struct ObDDLCheckTabletMergeStatusResult +{ + OB_UNIS_VERSION(1); +public: + ObDDLCheckTabletMergeStatusResult() + : merge_status_() {} + void reset() { merge_status_.reset(); } +public: + TO_STRING_KV(K_(merge_status)); + common::ObSArray merge_status_; +}; + struct ObCheckSchemaVersionElapsedResult { OB_UNIS_VERSION(1); diff --git a/src/share/ob_srv_rpc_proxy.h b/src/share/ob_srv_rpc_proxy.h index 2aaff56604..72caa6fefe 100644 --- a/src/share/ob_srv_rpc_proxy.h +++ b/src/share/ob_srv_rpc_proxy.h @@ -75,6 +75,8 @@ public: RPC_AP(PR5 check_schema_version_elapsed, OB_CHECK_SCHEMA_VERSION_ELAPSED, (ObCheckSchemaVersionElapsedArg), ObCheckSchemaVersionElapsedResult); RPC_AP(PR5 check_modify_time_elapsed, OB_CHECK_MODIFY_TIME_ELAPSED, (ObCheckModifyTimeElapsedArg), ObCheckModifyTimeElapsedResult); + RPC_AP(PR5 check_ddl_tablet_merge_status, OB_DDL_CHECK_TABLET_MERGE_STATUS, (ObDDLCheckTabletMergeStatusArg), ObDDLCheckTabletMergeStatusResult); + RPC_S(PR5 switch_leader, OB_SWITCH_LEADER, (ObSwitchLeaderArg)); RPC_S(PR5 batch_switch_rs_leader, OB_BATCH_SWITCH_RS_LEADER, (ObAddr)); RPC_S(PR5 get_partition_count, OB_GET_PARTITION_COUNT, diff --git a/src/sql/das/ob_das_location_router.cpp b/src/sql/das/ob_das_location_router.cpp index 61e42189de..c88340f6bf 100644 --- a/src/sql/das/ob_das_location_router.cpp +++ b/src/sql/das/ob_das_location_router.cpp @@ -99,6 +99,7 @@ int VirtualSvrPair::get_part_and_tablet_id_by_server(const ObAddr &addr, if (!tablet_id.is_valid()) { LOG_DEBUG("virtual table partition not exists", K(ret), K(addr)); } + return ret; } diff --git a/src/sql/engine/cmd/ob_kill_executor.cpp b/src/sql/engine/cmd/ob_kill_executor.cpp index 4add6dad58..20f4d47232 100644 --- a/src/sql/engine/cmd/ob_kill_executor.cpp +++ b/src/sql/engine/cmd/ob_kill_executor.cpp @@ -123,15 +123,30 @@ int ObKillExecutor::get_remote_session_location(const ObKillSessionArg &arg, ObE if (OB_FAIL(ret)) { } else if (OB_FAIL(result_set->next())) { if (OB_LIKELY(OB_ITER_END == ret)) { - ret = OB_UNKNOWN_CONNECTION; + read_sql.reuse(); + if (OB_FAIL(generate_read_sql_from_session_info(arg.sess_id_, read_sql))) { + LOG_WARN("fail to generate sql", K(ret), K(read_sql)); + } else if (OB_FAIL(sql_proxy->read(res, read_sql.ptr()))) { + LOG_WARN("fail to read by sql proxy", K(ret), K(read_sql)); + } else if (OB_ISNULL(result_set = res.get_result())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("result set is NULL", K(ret), K(read_sql)); + } else if (OB_FAIL(result_set->next())) { + if (OB_LIKELY(OB_ITER_END == ret)) { + ret = OB_UNKNOWN_CONNECTION; + LOG_WARN("fail to get next row", K(ret), K(result_set)); + } + } } - LOG_WARN("fail to get next row", K(ret), K(result_set)); + } + + if (OB_FAIL(ret)) { } else { UNUSED(tmp_real_str_len); EXTRACT_STRBUF_FIELD_MYSQL(*result_set, "svr_ip", svr_ip, OB_IP_STR_BUFF, tmp_real_str_len); EXTRACT_INT_FIELD_MYSQL(*result_set, "svr_port", svr_port, int64_t); } - + //set addr if (OB_FAIL(ret)) { } else if (OB_UNLIKELY(OB_ITER_END != result_set->next())) { @@ -157,6 +172,17 @@ int ObKillExecutor::generate_read_sql(uint32_t sess_id, ObSqlString &sql) return ret; } +int ObKillExecutor::generate_read_sql_from_session_info(uint32_t sess_id, ObSqlString &sql) +{ + int ret = OB_SUCCESS; + const char *sql_str = "select svr_ip, svr_port from oceanbase.__all_virtual_session_info \ + where id = %u"; + if (OB_FAIL(sql.append_fmt(sql_str, sess_id))) { + LOG_WARN("fail to append sql", K(ret), K(sess_id)); + } + return ret; +} + int ObKillExecutor::kill_remote_session(ObExecContext &ctx, const ObAddr &addr, const ObKillSessionArg &arg) { int ret = OB_SUCCESS; diff --git a/src/sql/engine/cmd/ob_kill_executor.h b/src/sql/engine/cmd/ob_kill_executor.h index 1b8a74309c..592a0494e3 100644 --- a/src/sql/engine/cmd/ob_kill_executor.h +++ b/src/sql/engine/cmd/ob_kill_executor.h @@ -52,6 +52,7 @@ public: private: int get_remote_session_location(const ObKillSessionArg &arg, ObExecContext &ctx, common::ObAddr &addr); int generate_read_sql(uint32_t sess_id, common::ObSqlString &sql); + int generate_read_sql_from_session_info(uint32_t sess_id, common::ObSqlString &sql); int kill_remote_session(ObExecContext &ctx, const common::ObAddr &addr, const ObKillSessionArg &arg); DISALLOW_COPY_AND_ASSIGN(ObKillExecutor);