mark dblink connection unusable when execute query failed
This commit is contained in:
@ -78,7 +78,7 @@ public:
|
|||||||
dblink_id_(OB_INVALID_ID),
|
dblink_id_(OB_INVALID_ID),
|
||||||
dblink_driver_proto_(-1),
|
dblink_driver_proto_(-1),
|
||||||
has_reverse_link_credentials_(false),
|
has_reverse_link_credentials_(false),
|
||||||
exec_succ_(true)
|
usable_(true)
|
||||||
{}
|
{}
|
||||||
virtual ~ObISQLConnection() {}
|
virtual ~ObISQLConnection() {}
|
||||||
|
|
||||||
@ -134,15 +134,15 @@ public:
|
|||||||
bool get_init_remote_env() const { return is_init_remote_env_; }
|
bool get_init_remote_env() const { return is_init_remote_env_; }
|
||||||
void set_reverse_link_creadentials(bool flag) { has_reverse_link_credentials_ = flag; }
|
void set_reverse_link_creadentials(bool flag) { has_reverse_link_credentials_ = flag; }
|
||||||
bool get_reverse_link_creadentials() { return has_reverse_link_credentials_; }
|
bool get_reverse_link_creadentials() { return has_reverse_link_credentials_; }
|
||||||
void set_exec_succ(bool flag) { exec_succ_ = flag; }
|
void set_usable(bool flag) { usable_ = flag; }
|
||||||
bool get_exec_succ() { return exec_succ_; }
|
bool usable() { return usable_; }
|
||||||
protected:
|
protected:
|
||||||
bool oracle_mode_;
|
bool oracle_mode_;
|
||||||
bool is_init_remote_env_; // for dblink, we have to init remote env with some sql
|
bool is_init_remote_env_; // for dblink, we have to init remote env with some sql
|
||||||
uint64_t dblink_id_; // for dblink, record dblink_id of a connection used by dblink
|
uint64_t dblink_id_; // for dblink, record dblink_id of a connection used by dblink
|
||||||
int64_t dblink_driver_proto_; //for dblink, record DblinkDriverProto of a connection used by dblink
|
int64_t dblink_driver_proto_; //for dblink, record DblinkDriverProto of a connection used by dblink
|
||||||
bool has_reverse_link_credentials_; // for dblink, mark if this link has credentials set
|
bool has_reverse_link_credentials_; // for dblink, mark if this link has credentials set
|
||||||
bool exec_succ_;
|
bool usable_; // usable_ = false: connection is unusable, should not execute query again.
|
||||||
};
|
};
|
||||||
|
|
||||||
} // end namespace sqlclient
|
} // end namespace sqlclient
|
||||||
|
@ -172,6 +172,7 @@ int ObMySQLConnection::connect(const char *user, const char *pass, const char *d
|
|||||||
my_bool reconnect = 0; // in OB, do manual reconnect. xiaochu.yh
|
my_bool reconnect = 0; // in OB, do manual reconnect. xiaochu.yh
|
||||||
mysql_options(&mysql_, MYSQL_OPT_RECONNECT, &reconnect);
|
mysql_options(&mysql_, MYSQL_OPT_RECONNECT, &reconnect);
|
||||||
closed_ = false;
|
closed_ = false;
|
||||||
|
set_usable(true);
|
||||||
tenant_id_ = OB_SYS_TENANT_ID;
|
tenant_id_ = OB_SYS_TENANT_ID;
|
||||||
read_consistency_ = -1;
|
read_consistency_ = -1;
|
||||||
}
|
}
|
||||||
@ -241,6 +242,7 @@ int ObMySQLConnection::connect(const char *user, const char *pass, const char *d
|
|||||||
my_bool reconnect = 0; // in OB, do manual reconnect. xiaochu.yh
|
my_bool reconnect = 0; // in OB, do manual reconnect. xiaochu.yh
|
||||||
mysql_options(&mysql_, MYSQL_OPT_RECONNECT, &reconnect);
|
mysql_options(&mysql_, MYSQL_OPT_RECONNECT, &reconnect);
|
||||||
closed_ = false;
|
closed_ = false;
|
||||||
|
set_usable(true);
|
||||||
db_name_ = db;
|
db_name_ = db;
|
||||||
tenant_id_ = OB_SYS_TENANT_ID;
|
tenant_id_ = OB_SYS_TENANT_ID;
|
||||||
read_consistency_ = -1;
|
read_consistency_ = -1;
|
||||||
|
@ -444,7 +444,6 @@ int ObMySQLConnectionPool::acquire(const uint64_t tenant_id, ObMySQLConnection *
|
|||||||
LOG_WARN("fail to get connection", K(ret), K(tenant_id));
|
LOG_WARN("fail to get connection", K(ret), K(tenant_id));
|
||||||
} else if (OB_FAIL(try_connect(connection))) {
|
} else if (OB_FAIL(try_connect(connection))) {
|
||||||
LOG_WARN("failed to try connection, will release connection", K(ret), K(tenant_id));
|
LOG_WARN("failed to try connection, will release connection", K(ret), K(tenant_id));
|
||||||
connection->set_exec_succ(false);
|
|
||||||
const bool succ = false;
|
const bool succ = false;
|
||||||
if (OB_SUCCESS != release(connection, succ)) { // ignore ret
|
if (OB_SUCCESS != release(connection, succ)) { // ignore ret
|
||||||
LOG_WARN("failed to release connection, ignore ret");
|
LOG_WARN("failed to release connection, ignore ret");
|
||||||
@ -799,7 +798,6 @@ int ObMySQLConnectionPool::acquire_dblink(uint64_t dblink_id, const dblink_param
|
|||||||
LOG_WARN("fail to acquire dblink", K(ret), K(dblink_id));
|
LOG_WARN("fail to acquire dblink", K(ret), K(dblink_id));
|
||||||
} else if (OB_FAIL(try_connect_dblink(dblink_conn, sql_request_level))) {
|
} else if (OB_FAIL(try_connect_dblink(dblink_conn, sql_request_level))) {
|
||||||
LOG_WARN("fail to try connect dblink", K(ret), K(dblink_id));
|
LOG_WARN("fail to try connect dblink", K(ret), K(dblink_id));
|
||||||
dblink_conn->set_exec_succ(false);
|
|
||||||
int release_ret = release_dblink(dblink_conn, sessid);
|
int release_ret = release_dblink(dblink_conn, sessid);
|
||||||
if (release_ret != OB_SUCCESS) {
|
if (release_ret != OB_SUCCESS) {
|
||||||
LOG_WARN("fail to release dblink conn", K(release_ret), K(dblink_id));
|
LOG_WARN("fail to release dblink conn", K(release_ret), K(dblink_id));
|
||||||
@ -812,7 +810,7 @@ int ObMySQLConnectionPool::acquire_dblink(uint64_t dblink_id, const dblink_param
|
|||||||
int ObMySQLConnectionPool::release_dblink(ObISQLConnection *dblink_conn, uint32_t sessid)
|
int ObMySQLConnectionPool::release_dblink(ObISQLConnection *dblink_conn, uint32_t sessid)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
const bool succ = OB_NOT_NULL(dblink_conn) ? dblink_conn->get_exec_succ() : false;
|
const bool succ = OB_NOT_NULL(dblink_conn) ? dblink_conn->usable() : false;
|
||||||
if (OB_FAIL(release(dynamic_cast<ObMySQLConnection *>(dblink_conn), succ, sessid))) {
|
if (OB_FAIL(release(dynamic_cast<ObMySQLConnection *>(dblink_conn), succ, sessid))) {
|
||||||
LOG_WARN("fail to release dblink conn", K(ret));
|
LOG_WARN("fail to release dblink conn", K(ret));
|
||||||
}
|
}
|
||||||
|
@ -512,9 +512,6 @@ int ObDbLinkProxy::execute_init_sql(ObISQLConnection *dblink_conn, int link_type
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (OB_FAIL(ret) && OB_NOT_NULL(dblink_conn)) {
|
|
||||||
dblink_conn->set_exec_succ(false);
|
|
||||||
}
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -542,7 +539,6 @@ int ObDbLinkProxy::dblink_read(ObISQLConnection *dblink_conn, ReadResult &result
|
|||||||
LOG_WARN("null ptr", K(ret), KP(dblink_conn), KP(sql));
|
LOG_WARN("null ptr", K(ret), KP(dblink_conn), KP(sql));
|
||||||
} else if (OB_FAIL(dblink_conn->execute_read(OB_INVALID_TENANT_ID, sql, result))) {
|
} else if (OB_FAIL(dblink_conn->execute_read(OB_INVALID_TENANT_ID, sql, result))) {
|
||||||
LOG_WARN("read from dblink failed", K(ret), K(dblink_conn), KCSTRING(sql));
|
LOG_WARN("read from dblink failed", K(ret), K(dblink_conn), KCSTRING(sql));
|
||||||
dblink_conn->set_exec_succ(false);
|
|
||||||
} else {
|
} else {
|
||||||
LOG_DEBUG("succ to read from dblink", K(sql));
|
LOG_DEBUG("succ to read from dblink", K(sql));
|
||||||
}
|
}
|
||||||
@ -557,7 +553,6 @@ int ObDbLinkProxy::dblink_write(ObISQLConnection *dblink_conn, int64_t &affected
|
|||||||
LOG_WARN("null ptr", K(ret), KP(dblink_conn), KP(sql));
|
LOG_WARN("null ptr", K(ret), KP(dblink_conn), KP(sql));
|
||||||
} else if (OB_FAIL(dblink_conn->execute_write(OB_INVALID_TENANT_ID, sql, affected_rows))) {
|
} else if (OB_FAIL(dblink_conn->execute_write(OB_INVALID_TENANT_ID, sql, affected_rows))) {
|
||||||
LOG_WARN("write to dblink failed", K(ret), K(dblink_conn), K(sql));
|
LOG_WARN("write to dblink failed", K(ret), K(dblink_conn), K(sql));
|
||||||
dblink_conn->set_exec_succ(false);
|
|
||||||
} else {
|
} else {
|
||||||
LOG_DEBUG("succ to write by dblink", K(sql));
|
LOG_DEBUG("succ to write by dblink", K(sql));
|
||||||
}
|
}
|
||||||
@ -571,7 +566,6 @@ int ObDbLinkProxy::rollback(ObISQLConnection *dblink_conn)
|
|||||||
LOG_WARN("dblink conn is NULL", K(ret));
|
LOG_WARN("dblink conn is NULL", K(ret));
|
||||||
} else if (OB_FAIL(dblink_conn->rollback())) {
|
} else if (OB_FAIL(dblink_conn->rollback())) {
|
||||||
LOG_WARN("read from dblink failed", K(ret));
|
LOG_WARN("read from dblink failed", K(ret));
|
||||||
dblink_conn->set_exec_succ(false);
|
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
@ -88,16 +88,21 @@ int ObMySQLStatement::execute_update(int64_t &affected_rows)
|
|||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
int tmp_ret = 0;
|
int tmp_ret = 0;
|
||||||
|
const int CR_SERVER_LOST = 2013;
|
||||||
if (OB_ISNULL(conn_) || OB_ISNULL(stmt_) || OB_ISNULL(sql_str_)) {
|
if (OB_ISNULL(conn_) || OB_ISNULL(stmt_) || OB_ISNULL(sql_str_)) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_ERROR("invalid mysql stmt", K_(conn), KP_(stmt), KP_(sql_str), K(ret));
|
LOG_ERROR("invalid mysql stmt", K_(conn), KP_(stmt), KP_(sql_str), K(ret));
|
||||||
|
} else if (OB_UNLIKELY(!conn_->usable())) {
|
||||||
|
ret = -CR_SERVER_LOST;
|
||||||
|
conn_->set_last_error(ret);
|
||||||
|
LOG_WARN("conn already failed, should not execute query again!", K(conn_));
|
||||||
} else {
|
} else {
|
||||||
if (OB_UNLIKELY(!conn_->get_exec_succ())) {
|
|
||||||
LOG_ERROR("conn already failed, should not execute query again!", K(conn_));
|
|
||||||
}
|
|
||||||
int64_t begin = ObTimeUtility::current_monotonic_raw_time();
|
int64_t begin = ObTimeUtility::current_monotonic_raw_time();
|
||||||
if (0 != (tmp_ret = mysql_real_query(stmt_, sql_str_, STRLEN(sql_str_)))) {
|
if (0 != (tmp_ret = mysql_real_query(stmt_, sql_str_, STRLEN(sql_str_)))) {
|
||||||
ret = -mysql_errno(stmt_);
|
ret = -mysql_errno(stmt_);
|
||||||
|
if (is_need_disconnect_error(ret)) {
|
||||||
|
conn_->set_usable(false);
|
||||||
|
}
|
||||||
LOG_WARN("fail to query server","server", stmt_->host, "port", stmt_->port,
|
LOG_WARN("fail to query server","server", stmt_->host, "port", stmt_->port,
|
||||||
"err_msg", mysql_error(stmt_), K(tmp_ret), K(ret), K(sql_str_));
|
"err_msg", mysql_error(stmt_), K(tmp_ret), K(ret), K(sql_str_));
|
||||||
if (OB_NOT_MASTER == tmp_ret) {
|
if (OB_NOT_MASTER == tmp_ret) {
|
||||||
@ -118,16 +123,21 @@ ObMySQLResult *ObMySQLStatement::execute_query()
|
|||||||
{
|
{
|
||||||
ObMySQLResult *result = NULL;
|
ObMySQLResult *result = NULL;
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
|
const int CR_SERVER_LOST = 2013;
|
||||||
if (OB_ISNULL(conn_) || OB_ISNULL(stmt_) || OB_ISNULL(sql_str_)) {
|
if (OB_ISNULL(conn_) || OB_ISNULL(stmt_) || OB_ISNULL(sql_str_)) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_ERROR("invalid mysql stmt", K_(conn), K_(stmt), KP_(sql_str), K(ret));
|
LOG_ERROR("invalid mysql stmt", K_(conn), K_(stmt), KP_(sql_str), K(ret));
|
||||||
|
} else if (OB_UNLIKELY(!conn_->usable())) {
|
||||||
|
ret = -CR_SERVER_LOST;
|
||||||
|
conn_->set_last_error(ret);
|
||||||
|
LOG_WARN("conn already failed, should not execute query again", K(conn_));
|
||||||
} else {
|
} else {
|
||||||
int64_t begin = ObTimeUtility::current_monotonic_raw_time();
|
int64_t begin = ObTimeUtility::current_monotonic_raw_time();
|
||||||
if (OB_UNLIKELY(!conn_->get_exec_succ())) {
|
|
||||||
LOG_ERROR("conn already failed, should not execute query again!", K(conn_));
|
|
||||||
}
|
|
||||||
if (0 != mysql_real_query(stmt_, sql_str_, STRLEN(sql_str_))) {
|
if (0 != mysql_real_query(stmt_, sql_str_, STRLEN(sql_str_))) {
|
||||||
ret = -mysql_errno(stmt_);
|
ret = -mysql_errno(stmt_);
|
||||||
|
if (is_need_disconnect_error(ret)) {
|
||||||
|
conn_->set_usable(false);
|
||||||
|
}
|
||||||
const int ER_LOCK_WAIT_TIMEOUT = -1205;
|
const int ER_LOCK_WAIT_TIMEOUT = -1205;
|
||||||
if (ER_LOCK_WAIT_TIMEOUT == ret) {
|
if (ER_LOCK_WAIT_TIMEOUT == ret) {
|
||||||
LOG_INFO("fail to query server", "host", stmt_->host, "port", stmt_->port,
|
LOG_INFO("fail to query server", "host", stmt_->host, "port", stmt_->port,
|
||||||
@ -151,6 +161,16 @@ ObMySQLResult *ObMySQLStatement::execute_query()
|
|||||||
}
|
}
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool ObMySQLStatement::is_need_disconnect_error(int ret)
|
||||||
|
{
|
||||||
|
const int CLIENT_MIN_ERROR_CODE = 2000;
|
||||||
|
const int CLIENT_MAX_ERROR_CODE = 2099;
|
||||||
|
// need close connection when there is a client error
|
||||||
|
ret = abs(ret);
|
||||||
|
return ret >= CLIENT_MIN_ERROR_CODE && ret <= CLIENT_MAX_ERROR_CODE;
|
||||||
|
}
|
||||||
|
|
||||||
} // end namespace sqlclient
|
} // end namespace sqlclient
|
||||||
} // end namespace common
|
} // end namespace common
|
||||||
} // end namespace oceanbase
|
} // end namespace oceanbase
|
||||||
|
@ -50,6 +50,7 @@ public:
|
|||||||
* but ignore affected_rows
|
* but ignore affected_rows
|
||||||
*/
|
*/
|
||||||
int execute_update();
|
int execute_update();
|
||||||
|
bool is_need_disconnect_error(const int ret);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ! Deprecated
|
* ! Deprecated
|
||||||
|
@ -444,7 +444,7 @@ int ObDblinkCtxInSession::set_dblink_conn(common::sqlclient::ObISQLConnection *d
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObDblinkCtxInSession::clean_dblink_conn()
|
int ObDblinkCtxInSession::clean_dblink_conn(const bool force_disconnect)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
common::sqlclient::ObISQLConnection *dblink_conn =NULL;
|
common::sqlclient::ObISQLConnection *dblink_conn =NULL;
|
||||||
@ -464,7 +464,8 @@ int ObDblinkCtxInSession::clean_dblink_conn()
|
|||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("server_conn_pool of dblink connection is NULL", K(this), K(dblink_conn), K(i), K(ret));
|
LOG_WARN("server_conn_pool of dblink connection is NULL", K(this), K(dblink_conn), K(i), K(ret));
|
||||||
} else {
|
} else {
|
||||||
if (OB_FAIL(server_conn_pool->release(dblink_conn, true, session_info_->get_sessid()))) {
|
const bool need_disconnect = force_disconnect || !dblink_conn->usable();
|
||||||
|
if (OB_FAIL(server_conn_pool->release(dblink_conn, !need_disconnect, session_info_->get_sessid()))) {
|
||||||
LOG_WARN("session failed to release dblink connection", K(session_info_->get_sessid()), K(this), KP(dblink_conn), K(i), K(ret));
|
LOG_WARN("session failed to release dblink connection", K(session_info_->get_sessid()), K(this), KP(dblink_conn), K(i), K(ret));
|
||||||
} else {
|
} else {
|
||||||
LOG_TRACE("session succ to release dblink connection", K(session_info_->get_sessid()), K(this), KP(dblink_conn), K(i), K(ret));
|
LOG_TRACE("session succ to release dblink connection", K(session_info_->get_sessid()), K(this), KP(dblink_conn), K(i), K(ret));
|
||||||
|
@ -121,16 +121,16 @@ public:
|
|||||||
inline void reset()
|
inline void reset()
|
||||||
{
|
{
|
||||||
arena_alloc_.reset();
|
arena_alloc_.reset();
|
||||||
clean_dblink_conn();
|
const bool force_disconnect = true;
|
||||||
|
clean_dblink_conn(force_disconnect);
|
||||||
free_dblink_conn_pool();
|
free_dblink_conn_pool();
|
||||||
session_info_ = NULL;
|
|
||||||
reverse_dblink_ = NULL;
|
reverse_dblink_ = NULL;
|
||||||
}
|
}
|
||||||
int register_dblink_conn_pool(common::sqlclient::ObCommonServerConnectionPool *dblink_conn_pool);
|
int register_dblink_conn_pool(common::sqlclient::ObCommonServerConnectionPool *dblink_conn_pool);
|
||||||
int free_dblink_conn_pool();
|
int free_dblink_conn_pool();
|
||||||
int get_dblink_conn(uint64_t dblink_id, common::sqlclient::ObISQLConnection *&dblink_conn);
|
int get_dblink_conn(uint64_t dblink_id, common::sqlclient::ObISQLConnection *&dblink_conn);
|
||||||
int set_dblink_conn(common::sqlclient::ObISQLConnection *dblink_conn);
|
int set_dblink_conn(common::sqlclient::ObISQLConnection *dblink_conn);
|
||||||
int clean_dblink_conn();
|
int clean_dblink_conn(const bool force_disconnect);
|
||||||
inline bool is_dblink_xa_tras() { return !dblink_conn_holder_array_.empty(); }
|
inline bool is_dblink_xa_tras() { return !dblink_conn_holder_array_.empty(); }
|
||||||
int get_reverse_link(ObReverseLink *&reverse_dblink);
|
int get_reverse_link(ObReverseLink *&reverse_dblink);
|
||||||
private:
|
private:
|
||||||
|
@ -161,7 +161,7 @@ int ObLinkOp::init_dblink(uint64_t dblink_id, ObDbLinkProxy *dblink_proxy, bool
|
|||||||
} else {
|
} else {
|
||||||
dblink_conn_ = dblink_conn;
|
dblink_conn_ = dblink_conn;
|
||||||
in_xa_trascaction_ = true; //to tell link scan op don't release dblink_conn_
|
in_xa_trascaction_ = true; //to tell link scan op don't release dblink_conn_
|
||||||
LOG_INFO("link op get connection from xa trasaction", KP(dblink_conn_), K(lbt()));
|
LOG_TRACE("link op get connection from xa trasaction", K(dblink_id), KP(dblink_conn_));
|
||||||
}
|
}
|
||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
dblink_id_ = dblink_id;
|
dblink_id_ = dblink_id;
|
||||||
|
@ -346,6 +346,7 @@ void ObSQLSessionInfo::reset(bool skip_sys_var)
|
|||||||
expect_group_id_ = OB_INVALID_ID;
|
expect_group_id_ = OB_INVALID_ID;
|
||||||
group_id_not_expected_ = false;
|
group_id_not_expected_ = false;
|
||||||
//call at last time
|
//call at last time
|
||||||
|
dblink_context_.reset(); // need reset before ObBasicSessionInfo::reset(skip_sys_var);
|
||||||
ObBasicSessionInfo::reset(skip_sys_var);
|
ObBasicSessionInfo::reset(skip_sys_var);
|
||||||
txn_free_route_ctx_.reset();
|
txn_free_route_ctx_.reset();
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user