fix dblink hung and translate error

This commit is contained in:
cqliang1995
2023-08-14 04:18:38 +00:00
committed by ob-robot
parent 9cd60f84b4
commit 579631624e
7 changed files with 39 additions and 58 deletions

View File

@ -51,13 +51,13 @@ int sqlclient::ObDblinkErrorTrans::external_errno_to_ob_errno(bool is_oracle_err
external_errno <= 2075 && // you will known errno in [2000, 2075] is client error at dev.mysql.com external_errno <= 2075 && // you will known errno in [2000, 2075] is client error at dev.mysql.com
(!is_oracle_err || (!is_oracle_err ||
(is_oracle_err && (is_oracle_err &&
OB_NOT_NULL(external_errmsg) && (OB_NOT_NULL(external_errmsg) && 0 != STRLEN(external_errmsg)) &&
0 != std::memcmp(oracle_msg_prefix, external_errmsg, 0 != std::memcmp(oracle_msg_prefix, external_errmsg,
std::min(STRLEN(oracle_msg_prefix), STRLEN(external_errmsg)))))) { std::min(STRLEN(oracle_msg_prefix), STRLEN(external_errmsg)))))) {
ob_errno = external_errno; // do not map, show user client errno directly. ob_errno = external_errno; // do not map, show user client errno directly.
} else { } else {
ob_errno = OB_ERR_DBLINK_REMOTE_ECODE; // default ob_errno, if external_errno can not map to any valid ob_errno ob_errno = OB_ERR_DBLINK_REMOTE_ECODE; // default ob_errno, if external_errno can not map to any valid ob_errno
if (OB_ISNULL(external_errmsg)) { if (OB_ISNULL(external_errmsg) || 0 == STRLEN(external_errmsg)) {
for (int i = 0; i < oceanbase::common::OB_MAX_ERROR_CODE; ++i) { for (int i = 0; i < oceanbase::common::OB_MAX_ERROR_CODE; ++i) {
if (external_errno == (is_oracle_err ? get_oracle_errno(i) : get_mysql_errno(i))) { if (external_errno == (is_oracle_err ? get_oracle_errno(i) : get_mysql_errno(i))) {
ob_errno = -i; ob_errno = -i;

View File

@ -231,8 +231,11 @@ int ObMySQLConnection::connect(const char *user, const char *pass, const char *d
MYSQL *mysql = mysql_real_connect(&mysql_, host, user, pass, db, port, NULL, 0); MYSQL *mysql = mysql_real_connect(&mysql_, host, user, pass, db, port, NULL, 0);
if (OB_ISNULL(mysql)) { if (OB_ISNULL(mysql)) {
ret = -mysql_errno(&mysql_); ret = -mysql_errno(&mysql_);
char errmsg[256] = {0};
const char *srcmsg = mysql_error(&mysql_);
MEMCPY(errmsg, srcmsg, MIN(255, STRLEN(srcmsg)));
LOG_WARN("fail to connect to mysql server", K(get_sessid()), KCSTRING(host), KCSTRING(user), KCSTRING(db), K(port), LOG_WARN("fail to connect to mysql server", K(get_sessid()), KCSTRING(host), KCSTRING(user), KCSTRING(db), K(port),
"info", mysql_error(&mysql_), K(ret)); "info", errmsg, K(ret));
if (OB_INVALID_ID != get_dblink_id()) { if (OB_INVALID_ID != get_dblink_id()) {
LOG_WARN("dblink connection error", K(ret), LOG_WARN("dblink connection error", K(ret),
KP(this), KP(this),
@ -242,8 +245,9 @@ int ObMySQLConnection::connect(const char *user, const char *pass, const char *d
K(user), K(user),
K(db), K(db),
K(host), K(host),
K(port)); K(port),
TRANSLATE_CLIENT_ERR_2(ret, false, mysql_error(&mysql_)); K(errmsg));
TRANSLATE_CLIENT_ERR_2(ret, false, errmsg);
} }
} else { } else {
/*Note: mysql_real_connect() incorrectly reset the MYSQL_OPT_RECONNECT option /*Note: mysql_real_connect() incorrectly reset the MYSQL_OPT_RECONNECT option

View File

@ -150,7 +150,9 @@ int ObMySQLResultImpl::next()
LOG_WARN("unexpected null ptr", K(ret)); LOG_WARN("unexpected null ptr", K(ret));
} else { } else {
ret = -mysql_errno(stmt_handler); ret = -mysql_errno(stmt_handler);
const char *errmsg = mysql_error(stmt_handler); char errmsg[256] = {0};
const char *srcmsg = mysql_error(stmt_handler);
MEMCPY(errmsg, srcmsg, MIN(255, STRLEN(srcmsg)));
ObMySQLConnection *conn = stmt_.get_connection(); ObMySQLConnection *conn = stmt_.get_connection();
if (0 == ret) { if (0 == ret) {
ret = OB_ITER_END; ret = OB_ITER_END;

View File

@ -101,8 +101,11 @@ int ObMySQLStatement::execute_update(int64_t &affected_rows)
int64_t begin = ObTimeUtility::current_monotonic_raw_time(); int64_t begin = ObTimeUtility::current_monotonic_raw_time();
if (0 != (tmp_ret = mysql_real_query(stmt_, sql_str_, STRLEN(sql_str_)))) { if (0 != (tmp_ret = mysql_real_query(stmt_, sql_str_, STRLEN(sql_str_)))) {
ret = -mysql_errno(stmt_); ret = -mysql_errno(stmt_);
char errmsg[256] = {0};
const char *srcmsg = mysql_error(stmt_);
MEMCPY(errmsg, srcmsg, MIN(255, STRLEN(srcmsg)));
LOG_WARN("fail to query server", "sessid", conn_->get_sessid(), "server", stmt_->host, "port", stmt_->port, LOG_WARN("fail to query server", "sessid", conn_->get_sessid(), "server", stmt_->host, "port", stmt_->port,
"err_msg", mysql_error(stmt_), K(tmp_ret), K(ret), K(sql_str_)); "err_msg", errmsg, K(tmp_ret), K(ret), K(sql_str_));
if (OB_NOT_MASTER == tmp_ret) { if (OB_NOT_MASTER == tmp_ret) {
// conn -> server pool -> connection pool // conn -> server pool -> connection pool
conn_->get_root()->get_root()->signal_refresh(); // refresh server pool immediately conn_->get_root()->get_root()->signal_refresh(); // refresh server pool immediately
@ -116,10 +119,10 @@ int ObMySQLStatement::execute_update(int64_t &affected_rows)
K(conn_->ping()), K(conn_->ping()),
K(stmt_->host), K(stmt_->host),
K(stmt_->port), K(stmt_->port),
K(mysql_error(stmt_)), K(errmsg),
K(STRLEN(sql_str_)), K(STRLEN(sql_str_)),
K(sql_str_)); K(sql_str_));
TRANSLATE_CLIENT_ERR(ret, mysql_error(stmt_)); TRANSLATE_CLIENT_ERR(ret, errmsg);
} }
if (is_need_disconnect_error(ret)) { if (is_need_disconnect_error(ret)) {
conn_->set_usable(false); conn_->set_usable(false);
@ -150,13 +153,16 @@ ObMySQLResult *ObMySQLStatement::execute_query(bool enable_use_result)
int64_t begin = ObTimeUtility::current_monotonic_raw_time(); int64_t begin = ObTimeUtility::current_monotonic_raw_time();
if (0 != mysql_real_query(stmt_, sql_str_, STRLEN(sql_str_))) { if (0 != mysql_real_query(stmt_, sql_str_, STRLEN(sql_str_))) {
ret = -mysql_errno(stmt_); ret = -mysql_errno(stmt_);
char errmsg[256] = {0};
const char *srcmsg = mysql_error(stmt_);
MEMCPY(errmsg, srcmsg, MIN(255, STRLEN(srcmsg)));
const int ER_LOCK_WAIT_TIMEOUT = -1205; const int ER_LOCK_WAIT_TIMEOUT = -1205;
if (ER_LOCK_WAIT_TIMEOUT == ret) { if (ER_LOCK_WAIT_TIMEOUT == ret) {
LOG_INFO("fail to query server", "sessid", conn_->get_sessid(), "host", stmt_->host, "port", stmt_->port, LOG_INFO("fail to query server", "sessid", conn_->get_sessid(), "host", stmt_->host, "port", stmt_->port,
"err_msg", mysql_error(stmt_), K(ret), K(sql_str_)); "err_msg", errmsg, K(ret), K(sql_str_));
} else { } else {
LOG_WARN("fail to query server", "host", stmt_->host, "port", stmt_->port, K(conn_->get_sessid()), LOG_WARN("fail to query server", "host", stmt_->host, "port", stmt_->port, K(conn_->get_sessid()),
"err_msg", mysql_error(stmt_), K(ret), K(STRLEN(sql_str_)), K(sql_str_)); "err_msg", errmsg, K(ret), K(STRLEN(sql_str_)), K(sql_str_));
} }
if (OB_SUCCESS == ret) { if (OB_SUCCESS == ret) {
ret = OB_ERR_SQL_CLIENT; ret = OB_ERR_SQL_CLIENT;
@ -170,10 +176,10 @@ ObMySQLResult *ObMySQLStatement::execute_query(bool enable_use_result)
K(conn_->ping()), K(conn_->ping()),
K(stmt_->host), K(stmt_->host),
K(stmt_->port), K(stmt_->port),
K(mysql_error(stmt_)), K(errmsg),
K(STRLEN(sql_str_)), K(STRLEN(sql_str_)),
K(sql_str_)); K(sql_str_));
TRANSLATE_CLIENT_ERR(ret, mysql_error(stmt_)); TRANSLATE_CLIENT_ERR(ret, errmsg);
} }
if (is_need_disconnect_error(ret)) { if (is_need_disconnect_error(ret)) {
conn_->set_usable(false); conn_->set_usable(false);

View File

@ -371,6 +371,7 @@ int ObLinkOp::init_dblink_param_ctx(dblink_param_ctx &param_ctx)
param_ctx.dblink_id_ = dblink_id_; param_ctx.dblink_id_ = dblink_id_;
param_ctx.link_type_ = link_type_; param_ctx.link_type_ = link_type_;
param_ctx.sessid_ = sessid_; param_ctx.sessid_ = sessid_;
param_ctx.sql_request_level_ = next_sql_req_level_;
} }
return ret; return ret;
} }

View File

@ -274,6 +274,7 @@ int ObLinkScanOp::inner_open()
int ObLinkScanOp::inner_get_next_row() int ObLinkScanOp::inner_get_next_row()
{ {
row_allocator_.reuse(); row_allocator_.reuse();
clear_evaluated_flag();
return fetch_row(); return fetch_row();
} }
@ -302,32 +303,6 @@ int ObLinkScanOp::fetch_row()
if (OB_ITER_END != ret) { if (OB_ITER_END != ret) {
LOG_WARN("failed to get next row", K(ret)); LOG_WARN("failed to get next row", K(ret));
} else { } else {
// check if connection is alive, if not, then OB_ITER_END is a fake errno
if (sql::DblinkGetConnType::TM_CONN == conn_type_) {
if (OB_ISNULL(tm_rm_connection_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected null ptr", K(ret));
} else if (OB_FAIL(tm_rm_connection_->ping())) {
LOG_WARN("failed to ping tm_rm_connection_", K(ret));
}
} else if (sql::DblinkGetConnType::TEMP_CONN == conn_type_) {
if (OB_ISNULL(reverse_link_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected null ptr", K(ret));
} else if (OB_FAIL(reverse_link_->ping())) {
LOG_WARN("failed to ping reverse_link_", K(ret));
}
} else {
if (OB_ISNULL(dblink_conn_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected null ptr", K(ret));
} else if (OB_FAIL(dblink_conn_->ping())) {
LOG_WARN("failed to ping dblink_conn_", K(ret));
}
}
if (OB_SUCC(ret)) {
ret = OB_ITER_END;
}
reset_result(); reset_result();
} }
} else { } else {
@ -400,6 +375,7 @@ int ObLinkScanOp::inner_get_next_batch(const int64_t max_row_cnt)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
int64_t row_cnt = 0; int64_t row_cnt = 0;
clear_evaluated_flag();
if (iter_end_) { if (iter_end_) {
brs_.size_ = 0; brs_.size_ = 0;
brs_.end_ = true; brs_.end_ = true;
@ -415,14 +391,15 @@ int ObLinkScanOp::inner_get_next_batch(const int64_t max_row_cnt)
LOG_WARN("inner get next row failed", K(ret)); LOG_WARN("inner get next row failed", K(ret));
} }
} else { } else {
const ObIArray<ObExpr *> &output = spec_.output_; const ObIArray<ObExpr *> &select_exprs =
for (int64_t i = 0; OB_SUCC(ret) && i < output.count(); i++) { (MY_SPEC.select_exprs_.empty() ? spec_.output_ : MY_SPEC.select_exprs_);
ObExpr *expr = output.at(i); for (int64_t i = 0; OB_SUCC(ret) && i < select_exprs.count(); i++) {
if (!expr->is_const_expr() && ObExpr *expr = select_exprs.at(i);
T_FUN_SYS_REMOVE_CONST != expr->type_ && if (expr->is_const_expr()) {
T_QUESTIONMARK != expr->type_ && // do nothing
(ob_is_string_or_lob_type(expr->datum_meta_.type_) || } else if (T_QUESTIONMARK != expr->type_ &&
ob_is_raw(expr->datum_meta_.type_) || ob_is_json(expr->datum_meta_.type_))) { (ob_is_string_or_lob_type(expr->datum_meta_.type_) ||
ob_is_raw(expr->datum_meta_.type_) || ob_is_json(expr->datum_meta_.type_))) {
ObDatum &datum = expr->locate_expr_datum(eval_ctx_); ObDatum &datum = expr->locate_expr_datum(eval_ctx_);
char *buf = NULL; char *buf = NULL;
if (OB_ISNULL(buf = expr->get_str_res_mem(eval_ctx_, datum.len_))) { if (OB_ISNULL(buf = expr->get_str_res_mem(eval_ctx_, datum.len_))) {
@ -445,14 +422,6 @@ int ObLinkScanOp::inner_get_next_batch(const int64_t max_row_cnt)
brs_.size_ = row_cnt; brs_.size_ = row_cnt;
brs_.end_ = iter_end_; brs_.end_ = iter_end_;
brs_.skip_->reset(row_cnt); brs_.skip_->reset(row_cnt);
const ObIArray<ObExpr *> &output = spec_.output_;
for (int64_t i = 0; OB_SUCC(ret) && i < output.count(); i++) {
ObExpr *expr = output.at(i);
if (expr->is_batch_result()) {
ObBitVector &eval_flags = expr->get_evaluated_flags(eval_ctx_);
eval_flags.set_all(row_cnt);
}
}
} }
} }
return ret; return ret;

View File

@ -4762,7 +4762,6 @@ int ObSql::check_batched_multi_stmt_after_resolver(ObPlanCacheCtx &pc_ctx,
ObPhysicalPlanCtx *plan_ctx = NULL; ObPhysicalPlanCtx *plan_ctx = NULL;
is_valid = true; is_valid = true;
bool has_dblink = false; bool has_dblink = false;
bool has_any_dblink = false;
bool is_ps_ab_opt = pc_ctx.sql_ctx_.multi_stmt_item_.is_ab_batch_opt(); bool is_ps_ab_opt = pc_ctx.sql_ctx_.multi_stmt_item_.is_ab_batch_opt();
if (OB_ISNULL(plan_ctx = pc_ctx.exec_ctx_.get_physical_plan_ctx()) if (OB_ISNULL(plan_ctx = pc_ctx.exec_ctx_.get_physical_plan_ctx())
|| OB_ISNULL(pc_ctx.sql_ctx_.session_info_)) { || OB_ISNULL(pc_ctx.sql_ctx_.session_info_)) {
@ -4785,9 +4784,9 @@ int ObSql::check_batched_multi_stmt_after_resolver(ObPlanCacheCtx &pc_ctx,
is_valid = false; is_valid = false;
} }
if (OB_FAIL(ObDblinkUtils::has_reverse_link_or_any_dblink(&delupd_stmt, has_dblink, has_any_dblink))) { if (OB_FAIL(ObDblinkUtils::has_reverse_link_or_any_dblink(&delupd_stmt, has_dblink, true))) {
LOG_WARN("failed to check dblink in stmt", K(delupd_stmt), K(ret)); LOG_WARN("failed to check dblink in stmt", K(delupd_stmt), K(ret));
} else if (has_any_dblink) { } else if (has_dblink) {
is_valid = false; is_valid = false;
} }