fix dblink bugs

This commit is contained in:
cqliang1995
2023-07-13 14:23:58 +00:00
committed by ob-robot
parent f1bcb57e3a
commit 59a7f93a3f
23 changed files with 509 additions and 300 deletions

View File

@ -120,6 +120,90 @@ int ObDblinkService::get_set_sql_mode_cstr(sql::ObSQLSessionInfo *session_info,
return ret;
}
int ObDblinkService::get_set_names_cstr(sql::ObSQLSessionInfo *session_info,
const char *&set_client_charset,
const char *&set_connection_charset,
const char *&set_results_charset)
{
int ret = OB_SUCCESS;
int64_t client = -1;
int64_t connection = -1;
int64_t results = -1;
set_client_charset = NULL;
set_connection_charset = NULL;
set_results_charset = "set character_set_results = NULL";
if (OB_ISNULL(session_info)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected null ptr", K(ret));
} else if (OB_FAIL(session_info->get_sys_variable(share::SYS_VAR_CHARACTER_SET_CLIENT, client))) {
LOG_WARN("failed to get client characterset", K(ret));
} else if (OB_FAIL(session_info->get_sys_variable(share::SYS_VAR_CHARACTER_SET_CONNECTION, connection))) {
LOG_WARN("failed to get connection characterset", K(ret));
} else if (OB_FAIL(session_info->get_sys_variable(share::SYS_VAR_CHARACTER_SET_CLIENT, results))) {
LOG_WARN("failed to get results characterset", K(ret));
} else {
switch(ObCharset::charset_type_by_coll(ObCollationType(client))) {
case ObCharsetType::CHARSET_UTF8MB4:
set_client_charset = "set character_set_client = utf8mb4";
break;
case ObCharsetType::CHARSET_GBK:
set_client_charset = "set character_set_client = gbk";
break;
case ObCharsetType::CHARSET_BINARY:
set_client_charset = "set character_set_client = binary";
break;
default:
// do nothing
break;
}
switch(ObCharset::charset_type_by_coll(ObCollationType(connection))) {
case ObCharsetType::CHARSET_UTF8MB4:
set_connection_charset = "set character_set_connection = utf8mb4";
break;
case ObCharsetType::CHARSET_GBK:
set_connection_charset = "set character_set_connection = gbk";
break;
case ObCharsetType::CHARSET_BINARY:
set_connection_charset = "set character_set_connection = binary";
break;
default:
// do nothing
break;
}
switch(ObCharset::charset_type_by_coll(ObCollationType(results))) {
case ObCharsetType::CHARSET_UTF8MB4:
set_results_charset = "set character_set_results = utf8mb4";
break;
case ObCharsetType::CHARSET_GBK:
set_results_charset = "set character_set_results = gbk";
break;
case ObCharsetType::CHARSET_BINARY:
set_results_charset = "set character_set_results = binary";
break;
default:
// do nothing
break;
}
}
return ret;
}
int ObDblinkService::get_local_session_vars(sql::ObSQLSessionInfo *session_info,
ObIAllocator &allocator,
common::sqlclient::dblink_param_ctx &param_ctx)
{
int ret = OB_SUCCESS;
if (lib::is_mysql_mode() && OB_FAIL(get_set_sql_mode_cstr(session_info, param_ctx.set_sql_mode_cstr_, allocator))) {
LOG_WARN("failed to get set_sql_mode_cstr", K(ret));
} else if (OB_FAIL(get_set_names_cstr(session_info,
param_ctx.set_client_charset_cstr_,
param_ctx.set_connection_charset_cstr_,
param_ctx.set_results_charset_cstr_))) {
LOG_WARN("failed to get set_names_cstr", K(ret));
}
return ret;
}
ObReverseLink::ObReverseLink()
: user_(),
tenant_(),
@ -228,7 +312,11 @@ const int64_t ObReverseLink::LONG_QUERY_TIMEOUT = 120*1000*1000; //120 seconds
int ObReverseLink::open(int64_t session_sql_req_level)
{
int ret = OB_SUCCESS;
if (!is_close_) {
common::sqlclient::dblink_param_ctx param_ctx;
if (OB_ISNULL(session_info_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected null ptr", K(ret));
} else if (!is_close_) {
// nothing to do
} else if (tenant_.empty() || user_.empty() || passwd_.empty() /*|| db_name.empty()*/
|| OB_UNLIKELY(cluster_.length() >= OB_MAX_CLUSTER_NAME_LENGTH)
@ -238,6 +326,8 @@ int ObReverseLink::open(int64_t session_sql_req_level)
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret),
K(cluster_), K(tenant_), K(user_), K(passwd_));
} else if (OB_FAIL(ObDblinkService::get_local_session_vars(session_info_, allocator_, param_ctx))) {
LOG_WARN("failed to get local session vars", K(ret));
} else {
if (cluster_.empty()) {
(void)snprintf(db_user_, sizeof(db_user_), "%.*s@%.*s", user_.length(), user_.ptr(),
@ -249,11 +339,12 @@ int ObReverseLink::open(int64_t session_sql_req_level)
}
(void)snprintf(db_pass_, sizeof(db_pass_), "%.*s", passwd_.length(), passwd_.ptr());
LOG_DEBUG("open reverse link connection", K(ret), K(db_user_), K(db_pass_), K(addr_));
param_ctx.link_type_ = common::sqlclient::DBLINK_DRV_OB;
if (OB_FAIL(reverse_conn_.connect(db_user_, db_pass_, "", addr_, 10, true, session_sql_req_level))) { //just set connect timeout to 10s, read and write have not timeout
LOG_WARN("failed to open reverse link connection", K(ret), K(db_user_), K(db_pass_), K(addr_));
} else if (OB_FAIL(reverse_conn_.set_timeout_variable(LONG_QUERY_TIMEOUT, common::sqlclient::ObMySQLConnectionPool::DEFAULT_TRANSACTION_TIMEOUT_US))) {
LOG_WARN("failed to set reverse link connection's timeout", K(ret));
} else if (OB_FAIL(ObDbLinkProxy::execute_init_sql(&reverse_conn_, common::sqlclient::DBLINK_DRV_OB))) {
} else if (OB_FAIL(ObDbLinkProxy::execute_init_sql(param_ctx, &reverse_conn_))) {
LOG_WARN("failed to init reverse link connection", K(ret));
} else {
is_close_ = false;
@ -281,6 +372,18 @@ int ObReverseLink::read(const char *sql, ObISQLClient::ReadResult &res)
return ret;
}
int ObReverseLink::ping()
{
int ret = OB_SUCCESS;
if (is_close_) {
ret = OB_NOT_INIT;
LOG_WARN("reverse link connection is closed", K(ret));
} else if (OB_FAIL(reverse_conn_.ping())) {
LOG_WARN("faild to ping reverse link connection", K(ret));
}
return ret;
}
int ObReverseLink::close()
{
int ret = OB_SUCCESS;
@ -452,7 +555,7 @@ int ObDblinkCtxInSession::clean_dblink_conn(const bool force_disconnect)
LOG_WARN("server_conn_pool of dblink connection is NULL", K(this), K(dblink_conn), K(i), K(ret));
} else {
const bool need_disconnect = force_disconnect || !dblink_conn->usable();
if (OB_FAIL(server_conn_pool->release(dblink_conn, !need_disconnect, session_info_->get_sessid()))) {
if (OB_FAIL(server_conn_pool->release(dblink_conn, !need_disconnect))) {
LOG_WARN("session failed to release dblink connection", K(session_info_->get_sessid()), K(this), KP(dblink_conn), K(i), K(ret));
} else {
LOG_TRACE("session succ to release dblink connection", K(session_info_->get_sessid()), K(this), KP(dblink_conn), K(i), K(ret));
@ -507,6 +610,7 @@ int ObDblinkCtxInSession::get_reverse_link(ObReverseLink *&reverse_dblink)
} else if (OB_FAIL(reverse_dblink_->deserialize(new_buff, new_size, pos))) {
LOG_WARN("failed to deserialize reverse_dblink_", K(new_size), K(ret));
} else {
reverse_dblink_->set_session_info(session_info_);
reverse_dblink = reverse_dblink_;
LOG_DEBUG("succ to get reverse link from seesion", K(session_info_->get_sessid()), K(*reverse_dblink), KP(reverse_dblink));
}

View File

@ -32,7 +32,16 @@ class ObDblinkService
public:
static int check_lob_in_result(common::sqlclient::ObMySQLResult *result, bool &have_lob);
static int get_length_from_type_text(ObString &type_text, int32_t &length);
static int get_set_sql_mode_cstr(sql::ObSQLSessionInfo *session_info, const char *&set_sql_mode_cstr, ObIAllocator &allocator);
static int get_local_session_vars(sql::ObSQLSessionInfo *session_info,
ObIAllocator &allocator,
common::sqlclient::dblink_param_ctx &param_ctx);
static int get_set_sql_mode_cstr(sql::ObSQLSessionInfo *session_info,
const char *&set_sql_mode_cstr,
ObIAllocator &allocator);
static int get_set_names_cstr(sql::ObSQLSessionInfo *session_info,
const char *&set_client_charset,
const char *&set_connection_charset,
const char *&set_results_charset);
};
enum DblinkGetConnType {
@ -55,6 +64,7 @@ public:
inline void set_self_addr(common::ObAddr addr) { self_addr_ = addr; }
inline void set_tx_id(int64_t tx_id) { tx_id_ = tx_id; }
inline void set_tm_sessid(uint32_t tm_sessid) { tm_sessid_ = tm_sessid; }
inline void set_session_info(sql::ObSQLSessionInfo *session_info) { session_info_ = session_info; }
const ObString &get_user() { return user_; }
const ObString &get_tenant() { return tenant_; }
const ObString &get_cluster() { return cluster_; }
@ -66,6 +76,7 @@ public:
int open(int64_t session_sql_req_level);
int read(const char *sql, ObISQLClient::ReadResult &res);
int ping();
int close();
TO_STRING_KV(K_(user),
K_(tenant),
@ -95,6 +106,7 @@ private:
common::sqlclient::ObMySQLConnection reverse_conn_; // ailing.lcq to do, ObReverseLink can be used by serval connection, not just one
char db_user_[OB_MAX_USER_NAME_LENGTH + OB_MAX_TENANT_NAME_LENGTH + OB_MAX_CLUSTER_NAME_LENGTH];
char db_pass_[OB_MAX_PASSWORD_LENGTH];
sql::ObSQLSessionInfo *session_info_; // reverse link belongs to which session
};
class ObDblinkUtils

View File

@ -109,6 +109,7 @@ int ObLinkOp::init_dblink(uint64_t dblink_id, ObDbLinkProxy *dblink_proxy, bool
my_session = ctx_.get_my_session();
ObPhysicalPlanCtx *plan_ctx = GET_PHY_PLAN_CTX(ctx_);
in_xa_trascaction_ = in_xa_trascaction;
dblink_id_ = dblink_id;
if (OB_NOT_NULL(dblink_proxy_)) {
ret = OB_INIT_TWICE;
LOG_WARN("link scan ctx already inited", K(ret));
@ -128,36 +129,25 @@ int ObLinkOp::init_dblink(uint64_t dblink_id, ObDbLinkProxy *dblink_proxy, bool
LOG_WARN("dblink schema is NULL", K(ret), K(dblink_id));
} else if (FALSE_IT(set_link_driver_proto(static_cast<DblinkDriverProto>(dblink_schema_->get_driver_proto())))) {
// do nothing
} else if (OB_FAIL(ObLinkOp::init_dblink_param_ctx(ctx_, param_ctx))) {
} else if (OB_FAIL(ObLinkOp::init_dblink_param_ctx(param_ctx))) {
LOG_WARN("failed to init dblink param ctx", K(ret));
} else if (OB_FAIL(dblink_proxy->create_dblink_pool(tenant_id_,
dblink_id,
link_type_,
} else if (OB_FAIL(dblink_proxy->create_dblink_pool(param_ctx,
dblink_schema_->get_host_addr(),
dblink_schema_->get_tenant_name(),
dblink_schema_->get_user_name(),
dblink_schema_->get_plain_password(),
dblink_schema_->get_database_name(),
dblink_schema_->get_conn_string(),
dblink_schema_->get_cluster_name(),
param_ctx))) {
dblink_schema_->get_cluster_name()))) {
LOG_WARN("failed to create dblink pool", K(ret));
} else if (OB_FAIL(my_session->get_dblink_context().get_dblink_conn(dblink_id, dblink_conn))) {
LOG_WARN("failed to get dblink connection from session", K(my_session), K(sessid_), K(ret));
} else {
if (NULL == dblink_conn) {
const char *set_sql_mode_sql = NULL;
if (!is_oracle_mode() && OB_FAIL(ObDblinkService::get_set_sql_mode_cstr(my_session, set_sql_mode_sql, allocator_))) {
LOG_WARN("failed to get set_sql_mode_sql", K(ret));
} else if (OB_FAIL(dblink_proxy->acquire_dblink(tenant_id_,
dblink_id,
link_type_,
param_ctx,
dblink_conn_,
sessid_,
next_sql_req_level_,
set_sql_mode_sql))) {
LOG_WARN("failed to acquire dblink", K(ret), K(dblink_id));
if (OB_FAIL(ObDblinkService::get_local_session_vars(my_session, allocator_, param_ctx))) {
LOG_WARN("failed to get local session vars", K(ret));
} else if (OB_FAIL(dblink_proxy->acquire_dblink(param_ctx, dblink_conn_))) {
LOG_WARN("failed to acquire dblink", K(ret), K(param_ctx));
} else if (OB_FAIL(my_session->get_dblink_context().register_dblink_conn_pool(dblink_conn_->get_common_server_pool()))) {
LOG_WARN("failed to register dblink conn pool to current session", K(ret));
} else if (in_xa_trascaction_ && lib::is_oracle_mode() &&
@ -172,7 +162,6 @@ int ObLinkOp::init_dblink(uint64_t dblink_id, ObDbLinkProxy *dblink_proxy, bool
LOG_TRACE("link op get connection from xa trasaction", K(dblink_id), KP(dblink_conn_));
}
if (OB_SUCC(ret)) {
dblink_id_ = dblink_id;
dblink_proxy_ = dblink_proxy;
}
}
@ -367,23 +356,28 @@ int ObLinkSpec::set_param_infos(const ObIArray<ObParamPosIdx> &param_infos)
return ret;
}
int ObLinkOp::init_dblink_param_ctx(ObExecContext &exec_ctx, dblink_param_ctx &param_ctx)
int ObLinkOp::init_dblink_param_ctx(dblink_param_ctx &param_ctx)
{
int ret = OB_SUCCESS;
uint16_t charset_id = 0;
uint16_t ncharset_id = 0;
if (OB_FAIL(get_charset_id(exec_ctx, charset_id, ncharset_id))) {
if (OB_FAIL(get_charset_id(ctx_, charset_id, ncharset_id))) {
LOG_WARN("failed to get session charset id", K(ret));
} else {
param_ctx.charset_id_ = charset_id;
param_ctx.ncharset_id_ = ncharset_id;
param_ctx.pool_type_ = DblinkPoolType::DBLINK_POOL_DEF;
param_ctx.tenant_id_ = tenant_id_;
param_ctx.dblink_id_ = dblink_id_;
param_ctx.link_type_ = link_type_;
param_ctx.sessid_ = sessid_;
}
return ret;
}
int ObLinkOp::get_charset_id(ObExecContext &exec_ctx,
uint16_t &charset_id, uint16_t &ncharset_id)
uint16_t &charset_id,
uint16_t &ncharset_id)
{
int ret = OB_SUCCESS;
ObSQLSessionInfo *sess_info = NULL;

View File

@ -48,7 +48,7 @@ public:
const ObParamStore &param_store,
ObReverseLink *reverse_link = NULL);
virtual int inner_execute_link_stmt(const char *link_stmt) = 0;
static int init_dblink_param_ctx(ObExecContext &exec_ctx, common::sqlclient::dblink_param_ctx &param_ctx);
int init_dblink_param_ctx(common::sqlclient::dblink_param_ctx &param_ctx);
static int get_charset_id(ObExecContext &exec_ctx, uint16_t &charset_id, uint16_t &ncharset_id);
protected:
int combine_link_stmt(const common::ObString &link_stmt_fmt,

View File

@ -94,9 +94,9 @@ int ObLinkScanOp::inner_execute_link_stmt(const char *link_stmt)
LOG_WARN("unexpected NULL", K(ret), KP(link_stmt));
} else if (sql::DblinkGetConnType::TM_CONN == conn_type_) {
if (OB_FAIL(tm_rm_connection_->execute_read(OB_INVALID_TENANT_ID, link_stmt, res_))) {
LOG_WARN("failed to read table data by tm_rm_connection", K(ret), K(link_stmt), K(DblinkDriverProto(tm_rm_connection_->get_dblink_driver_proto())));
LOG_WARN("failed to read table data by tm_rm_connection", K(ret), K(link_stmt), K(tm_rm_connection_->get_dblink_driver_proto()));
} else {
LOG_DEBUG("succ to read table data by tm_rm_connection", K(link_stmt), K(DblinkDriverProto(tm_rm_connection_->get_dblink_driver_proto())));
LOG_DEBUG("succ to read table data by tm_rm_connection", K(link_stmt), K(tm_rm_connection_->get_dblink_driver_proto()));
}
} else if (sql::DblinkGetConnType::TEMP_CONN == conn_type_) {
if (OB_FAIL(reverse_link_->read(link_stmt, res_))) {
@ -125,7 +125,7 @@ int ObLinkScanOp::inner_execute_link_stmt(const char *link_stmt)
LOG_USER_ERROR(OB_NOT_SUPPORTED, "dblink fetch lob type data");
} else if (OB_FAIL(ObLinkOp::get_charset_id(ctx_, charset_id, ncharset_id))) {
LOG_WARN("failed to get charset id", K(ret));
} else if (OB_FAIL(result_->set_expected_charset_id(charset_id, ncharset_id))) {
} else if (OB_FAIL(result_->set_expected_charset_id(charset_id, ncharset_id))) {// for oci dblink set expected result charset, actually useless...
LOG_WARN("failed to set result set expected charset", K(ret), K(charset_id), K(ncharset_id));
} else {
LOG_DEBUG("succ to dblink read", K(link_stmt), KP(dblink_conn_));
@ -137,7 +137,7 @@ void ObLinkScanOp::reset_dblink()
{
int tmp_ret = OB_SUCCESS;
if (OB_NOT_NULL(dblink_proxy_) && OB_NOT_NULL(dblink_conn_) && !in_xa_trascaction_ &&
OB_SUCCESS != (tmp_ret = dblink_proxy_->release_dblink(link_type_, dblink_conn_, sessid_))) {
OB_SUCCESS != (tmp_ret = dblink_proxy_->release_dblink(link_type_, dblink_conn_))) {
LOG_WARN_RET(tmp_ret, "failed to release connection", K(tmp_ret));
}
if (OB_NOT_NULL(reverse_link_)) {
@ -302,6 +302,32 @@ int ObLinkScanOp::fetch_row()
if (OB_ITER_END != ret) {
LOG_WARN("failed to get next row", K(ret));
} else {
// check if connection is alive, if not, then OB_ITER_END is a fake errno
if (sql::DblinkGetConnType::TM_CONN == conn_type_) {
if (OB_ISNULL(tm_rm_connection_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected null ptr", K(ret));
} else if (OB_FAIL(tm_rm_connection_->ping())) {
LOG_WARN("failed to ping tm_rm_connection_", K(ret));
}
} else if (sql::DblinkGetConnType::TEMP_CONN == conn_type_) {
if (OB_ISNULL(reverse_link_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected null ptr", K(ret));
} else if (OB_FAIL(reverse_link_->ping())) {
LOG_WARN("failed to ping reverse_link_", K(ret));
}
} else {
if (OB_ISNULL(dblink_conn_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected null ptr", K(ret));
} else if (OB_FAIL(dblink_conn_->ping())) {
LOG_WARN("failed to ping dblink_conn_", K(ret));
}
}
if (OB_SUCC(ret)) {
ret = OB_ITER_END;
}
reset_result();
}
} else {
@ -366,6 +392,7 @@ int ObLinkScanOp::inner_rescan()
reset_link_sql();
iter_end_ = false;
iterated_rows_ = -1;
int tmp_ret = OB_SUCCESS;
return ObOperator::inner_rescan();
}

View File

@ -84,6 +84,7 @@
#include "sql/ob_optimizer_trace_impl.h"
#include "sql/monitor/ob_sql_plan.h"
#include "sql/optimizer/ob_explain_log_plan.h"
#include "sql/dblink/ob_dblink_utils.h"
namespace oceanbase
{
@ -4752,6 +4753,8 @@ int ObSql::check_batched_multi_stmt_after_resolver(ObPlanCacheCtx &pc_ctx,
int ret = OB_SUCCESS;
ObPhysicalPlanCtx *plan_ctx = NULL;
is_valid = true;
bool has_dblink = false;
bool has_any_dblink = false;
bool is_ps_ab_opt = pc_ctx.sql_ctx_.multi_stmt_item_.is_ab_batch_opt();
if (OB_ISNULL(plan_ctx = pc_ctx.exec_ctx_.get_physical_plan_ctx())
|| OB_ISNULL(pc_ctx.sql_ctx_.session_info_)) {
@ -4774,6 +4777,12 @@ int ObSql::check_batched_multi_stmt_after_resolver(ObPlanCacheCtx &pc_ctx,
is_valid = false;
}
if (OB_FAIL(ObDblinkUtils::has_reverse_link_or_any_dblink(&delupd_stmt, has_dblink, has_any_dblink))) {
LOG_WARN("failed to check dblink in stmt", K(delupd_stmt), K(ret));
} else if (has_any_dblink) {
is_valid = false;
}
// make sure type of all the parameters are the same
if (OB_SUCC(ret) && is_valid) {
if (!is_ps_ab_opt) {

View File

@ -938,8 +938,8 @@ int ObStmtComparer::compare_basic_table_item(const ObDMLStmt *first,
|| OB_ISNULL(second) || OB_ISNULL(second_table)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("param has null", K(first), K(first_table), K(second), K(second_table));
} else if (first_table->is_basic_table() &&
second_table->is_basic_table() &&
} else if ((first_table->is_basic_table() || first_table->is_link_table()) &&
(second_table->is_basic_table() || second_table->is_link_table()) &&
first_table->ref_id_ == second_table->ref_id_ &&
first_table->flashback_query_type_ == second_table->flashback_query_type_ &&
(first_table->flashback_query_expr_ == second_table->flashback_query_expr_ ||
@ -1093,7 +1093,8 @@ int ObStmtComparer::compare_table_item(const ObDMLStmt *first,
} else {
relation = QueryRelation::QUERY_UNCOMPARABLE;
}
} else if (first_table->is_basic_table() && second_table->is_basic_table()) {
} else if ((first_table->is_basic_table() || first_table->is_link_table()) &&
(second_table->is_basic_table() || second_table->is_link_table())) {
if (OB_FAIL(compare_basic_table_item(first,
first_table,
second,

View File

@ -551,6 +551,7 @@ int ObTransformJoinElimination::create_missing_select_items(ObSelectStmt *source
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null", K(temp_source_table), K(temp_target_table), K(ret));
} else if ((temp_source_table->is_basic_table() && temp_target_table->is_basic_table()) ||
(temp_source_table->is_link_table() && temp_target_table->is_link_table()) ||
(temp_source_table->is_temp_table() && temp_target_table->is_temp_table()) ||
(temp_source_table->is_generated_table() && temp_target_table->is_generated_table())) {
if (OB_FAIL(ObTransformUtils::merge_table_items(source_stmt,