[FEAT MERGE] implement mysql dblink and read consistency

Co-authored-by: xianyu-w <707512433@qq.com>
Co-authored-by: sdc <njucssdc@gmail.com>
Co-authored-by: seuwebber <webber_code@163.com>
This commit is contained in:
cqliang1995
2023-05-09 18:32:03 +00:00
committed by ob-robot
parent 38b78ad442
commit 4108e781d4
132 changed files with 2726 additions and 818 deletions

View File

@ -84,6 +84,42 @@ int ObDblinkService::get_length_from_type_text(ObString &type_text, int32_t &len
return ret;
}
int ObDblinkService::get_set_sql_mode_cstr(sql::ObSQLSessionInfo *session_info, const char *&set_sql_mode_cstr, ObIAllocator &allocator)
{
int ret = OB_SUCCESS;
ObSqlString set_sql_mode_sql;
ObObj sql_mode_int_obj;
ObObj sql_mode_str_obj;
const char *set_sql_mode_fmt = "SET SESSION sql_mode = '%.*s'";
void *buf = NULL;
int64_t copy_len = 0;
if (lib::is_oracle_mode()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("only used in mysql mode", K(ret));
} else if (OB_ISNULL(session_info)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected null ptr", K(ret));
} else if (OB_FAIL(session_info->get_sys_variable(share::SYS_VAR_SQL_MODE, sql_mode_int_obj))) {
LOG_WARN("failed to get SYS_VAR_SET_REVERSE_DBLINK_INFOS", K(sql_mode_int_obj), K(ret));
} else if (OB_FAIL(ob_sql_mode_to_str(sql_mode_int_obj, sql_mode_str_obj, &allocator))) {
LOG_WARN("failed sql mode to str", K(sql_mode_int_obj), K(ret));
} else if (OB_FAIL(set_sql_mode_sql.append_fmt(set_sql_mode_fmt,
sql_mode_str_obj.val_len_,
sql_mode_str_obj.v_.string_))) {
LOG_WARN("append sql failed", K(ret), K(sql_mode_str_obj));
} else if (FALSE_IT([&]{ copy_len = set_sql_mode_sql.length(); }())) {
} else if (OB_ISNULL(buf = allocator.alloc(copy_len + 1))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to allocate memory", K(ret), K(copy_len));
} else {
MEMCPY(buf, set_sql_mode_sql.ptr(), copy_len);
char *sql_cstr = static_cast<char *>(buf);
sql_cstr[copy_len] = 0;
set_sql_mode_cstr = sql_cstr;
}
return ret;
}
ObReverseLink::ObReverseLink()
: user_(),
tenant_(),
@ -214,13 +250,10 @@ int ObReverseLink::open(int64_t session_sql_req_level)
(void)snprintf(db_pass_, sizeof(db_pass_), "%.*s", passwd_.length(), passwd_.ptr());
LOG_DEBUG("open reverse link connection", K(ret), K(db_user_), K(db_pass_), K(addr_));
if (OB_FAIL(reverse_conn_.connect(db_user_, db_pass_, "", addr_, 10, true, session_sql_req_level))) { //just set connect timeout to 10s, read and write have not timeout
ObDblinkUtils::process_dblink_errno(common::sqlclient::DBLINK_DRV_OB, ret);
LOG_WARN("failed to open reverse link connection", K(ret), K(db_user_), K(db_pass_), K(addr_));
} else if (OB_FAIL(reverse_conn_.set_timeout_variable(LONG_QUERY_TIMEOUT, common::sqlclient::ObMySQLConnectionPool::DEFAULT_TRANSACTION_TIMEOUT_US))) {
ObDblinkUtils::process_dblink_errno(common::sqlclient::DBLINK_DRV_OB, ret);
LOG_WARN("failed to set reverse link connection's timeout", K(ret));
} else if (OB_FAIL(ObDbLinkProxy::execute_init_sql(&reverse_conn_, common::sqlclient::DBLINK_DRV_OB))) {
ObDblinkUtils::process_dblink_errno(common::sqlclient::DBLINK_DRV_OB, ret);
LOG_WARN("failed to init reverse link connection", K(ret));
} else {
is_close_ = false;
@ -241,8 +274,6 @@ int ObReverseLink::read(const char *sql, ObISQLClient::ReadResult &res)
ret = OB_NOT_INIT;
LOG_WARN("reverse link connection is closed", K(ret));
} else if (OB_FAIL(reverse_conn_.execute_read(OB_INVALID_TENANT_ID, sql, res))) {
int dblink_errno = ret;
ObDblinkUtils::process_dblink_errno(common::sqlclient::DblinkDriverProto::DBLINK_DRV_OB, &reverse_conn_, ret);
LOG_WARN("faild to read by reverse link connection", K(ret), K(sql));
} else {
LOG_DEBUG("succ to read by reverse link connection", K(ret), K(sql));
@ -261,57 +292,6 @@ int ObReverseLink::close()
return ret;
}
int ObDblinkUtils::process_dblink_errno(common::sqlclient::DblinkDriverProto dblink_type, common::sqlclient::ObISQLConnection *dblink_conn, int &ob_errno) {
// The purpose of adding the process_dblink_errno function
// is to distinguish the errno processing when dblink connects to other types of databases.
const int orcl_errno = ob_errno;
switch (dblink_type) {
case common::sqlclient::DblinkDriverProto::DBLINK_DRV_OB: {
if (OB_UNLIKELY(NULL == dblink_conn)) {
get_ob_errno_from_oracle_errno(orcl_errno, NULL, ob_errno);
} else {
get_ob_errno_from_oracle_errno(orcl_errno, mysql_error(static_cast<common::sqlclient::ObMySQLConnection *>(dblink_conn)->get_handler()), ob_errno);
}
break;
}
case common::sqlclient::DblinkDriverProto::DBLINK_DRV_OCI: {
get_ob_errno_from_oracle_errno(orcl_errno, NULL, ob_errno);
}
default: {
//nothing
break;
}
}
// some oracle error code can not translate to oceanbase error code,
// so use OB_ERR_DBLINK_REMOTE_ECODE to represent those oracle error code.
if (orcl_errno == ob_errno &&
// error code -40xx will report from code in deps/, need skip it
(ob_errno != -4016 || ob_errno != -4012 ||
ob_errno != -4013 || ob_errno != -4002 || ob_errno != -4007)) {
LOG_USER_ERROR(OB_ERR_DBLINK_REMOTE_ECODE, orcl_errno);
ob_errno = OB_ERR_DBLINK_REMOTE_ECODE;
}
return OB_SUCCESS;
}
int ObDblinkUtils::process_dblink_errno(common::sqlclient::DblinkDriverProto dblink_type, int &ob_errno) {
// The purpose of adding the process_dblink_errno function
// is to distinguish the errno processing when dblink connects to other types of databases.
const int orcl_errno = ob_errno;
get_ob_errno_from_oracle_errno(orcl_errno, NULL, ob_errno);
// some oracle error code can not translate to oceanbase error code,
// so use OB_ERR_DBLINK_REMOTE_ECODE to represent those oracle error code.
if (-4016 == ob_errno ||
-4012 == ob_errno ||
-4002 == ob_errno) {
// do nothing
} else if (orcl_errno == ob_errno) {
LOG_USER_ERROR(OB_ERR_DBLINK_REMOTE_ECODE, orcl_errno);
ob_errno = OB_ERR_DBLINK_REMOTE_ECODE;
}
return OB_SUCCESS;
}
int ObDblinkUtils::has_reverse_link_or_any_dblink(const ObDMLStmt *stmt, bool &has, bool has_any_dblink) {
int ret = OB_SUCCESS;
const common::ObIArray<TableItem*> &table_items = stmt->get_table_items();