fix oci memleak and add port info to dblink view and fix pull data OOM

This commit is contained in:
obdev
2023-03-24 06:41:57 +00:00
committed by ob-robot
parent 7b0fa6b582
commit d2110be61c
19 changed files with 47 additions and 131 deletions

View File

@ -8350,91 +8350,6 @@ int ObSchemaServiceSQLImpl::get_link_table_schema(const ObDbLinkSchema *dblink_s
return ret;
}
int ObSchemaServiceSQLImpl::fetch_desc_table(uint64_t dblink_id,
common::sqlclient::DblinkDriverProto &link_type,
const ObString &database_name,
const ObString &table_name,
const common::sqlclient::dblink_param_ctx &param_ctx,
sql::ObSQLSessionInfo *session_info,
ObIAllocator &alloctor,
int64_t row_idx,
int32_t &length)
{
int ret = OB_SUCCESS;
const char * sql_str_fmt = "/*$BEFPARSEdblink_req_level=1*/ desc \"%.*s\".\"%.*s\"";
ObSqlString sql;
ObMySQLResult *result = NULL;
SMART_VAR(ObMySQLProxy::MySQLResult, res) {
common::sqlclient::ObISQLConnection *dblink_conn = NULL;
if (OB_FAIL(sql.append_fmt(sql_str_fmt, database_name.length(), database_name.ptr(),
table_name.length(), table_name.ptr()))) {
LOG_WARN("append sql failed", K(ret));
} else if (OB_FAIL(dblink_proxy_->acquire_dblink(dblink_id, link_type, param_ctx, dblink_conn, session_info->get_sessid(), 1))) {
ObDblinkUtils::process_dblink_errno(link_type, ret);
LOG_WARN("failed to acquire dblink", K(ret), K(dblink_id));
} else if (OB_FAIL(session_info->get_dblink_context().register_dblink_conn_pool(dblink_conn->get_common_server_pool()))) {
LOG_WARN("failed to register dblink conn pool to current session", K(ret));
} else if (OB_FAIL(dblink_proxy_->dblink_read(dblink_conn, res, sql.ptr()))) {
ObDblinkUtils::process_dblink_errno(link_type, dblink_conn, ret);
LOG_WARN("read link failed", K(ret), K(dblink_id), K(sql.ptr()));
} else if (OB_ISNULL(result = res.get_result())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("fail to get result", K(ret));
} else if (OB_FAIL(result->set_expected_charset_id(static_cast<uint16_t>(common::ObNlsCharsetId::CHARSET_AL32UTF8_ID),
static_cast<uint16_t>(common::ObNlsCharsetId::CHARSET_AL32UTF8_ID)))) {
// dblink will convert the result to AL32UTF8 when pulling schema meta
LOG_WARN("failed to set expected charset id", K(ret));
} else if (row_idx < 0) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid row_idx", K(ret), K(row_idx));
} else {
while(row_idx >= 0 && OB_SUCC(ret)) {
if (OB_FAIL(result->next())) {
LOG_WARN("failed to get next row", K(ret));
}
--row_idx;
}
if (-1 == row_idx && OB_SUCC(ret)) {
const ObTimeZoneInfo *tz_info = TZ_INFO(session_info);
ObObj value;
ObString string_value;
if (OB_ISNULL(tz_info)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("tz info is NULL", K(ret));
} else if (OB_FAIL(result->get_obj(1, value, tz_info, &alloctor))) {
LOG_WARN("failed to get obj", K(ret));
} else if (ObVarcharType != value.get_type()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("type is invalid", K(value.get_type()), K(ret));
} else if (OB_FAIL(value.get_varchar(string_value))) {
LOG_WARN("failed to get varchar value", K(ret));
} else if (OB_FAIL(ObDblinkService::get_length_from_type_text(string_value, length))) {
LOG_WARN("failed to get length", K(ret));
} else {
LOG_DEBUG("desc table type string", K(string_value), K(length));
}
} else {
// do nothing
}
}
if (NULL != dblink_conn) {
int tmp_ret = OB_SUCCESS;
if (DBLINK_DRV_OB == link_type &&
NULL != result &&
OB_SUCCESS != (tmp_ret = result->close())) {
LOG_WARN("failed to close result", K(tmp_ret));
}
if (OB_SUCCESS != (tmp_ret = dblink_proxy_->release_dblink(link_type, dblink_conn, session_info->get_sessid()))) {
LOG_WARN("failed to relese connection", K(tmp_ret));
}
if (OB_SUCC(ret)) {
ret = tmp_ret;
}
}
}
return ret;
}
template<typename T>
int ObSchemaServiceSQLImpl::fetch_link_table_info(uint64_t tenant_id,
uint64_t dblink_id,