dblink_write implement [FEAT MERGE]

Co-authored-by: zzg19950727 <1071026277@qq.com>
Co-authored-by: xianyu-w <707512433@qq.com>
This commit is contained in:
obdev
2023-01-29 16:23:18 +08:00
committed by ob-robot
parent 195ab500ab
commit 814de27a42
226 changed files with 9602 additions and 4087 deletions

View File

@ -19,6 +19,10 @@
#include "lib/mysqlclient/ob_mysql_connection.h"
#include "lib/mysqlclient/ob_mysql_connection_pool.h"
#include "sql/ob_sql_utils.h"
#include "sql/dblink/ob_tm_service.h"
#include "sql/dblink/ob_dblink_utils.h"
#include "lib/string/ob_hex_utils_base.h"
#include "sql/session/ob_sql_session_mgr.h"
#include "sql/engine/expr/ob_expr_lob_utils.h"
namespace oceanbase
{
@ -30,93 +34,36 @@ using namespace share::schema;
namespace sql
{
const int64_t ObLinkScanOp::STMT_BUF_BLOCK = 1024L;
ObLinkScanSpec::ObLinkScanSpec(common::ObIAllocator &alloc, const ObPhyOperatorType type)
: ObLinkSpec(alloc, type), has_for_update_(false)
{}
OB_SERIALIZE_MEMBER((ObLinkScanSpec, ObLinkSpec));
ObLinkScanOp::ObLinkScanOp(ObExecContext &exec_ctx, const ObOpSpec &spec, ObOpInput *input)
: ObOperator(exec_ctx, spec, input),
tenant_id_(OB_INVALID_ID),
dblink_id_(OB_INVALID_ID),
dblink_proxy_(NULL),
dblink_conn_(NULL),
: ObLinkOp(exec_ctx, spec, input),
result_(NULL),
allocator_(exec_ctx.get_allocator()),
tz_info_(NULL),
stmt_buf_(NULL),
stmt_buf_len_(STMT_BUF_BLOCK),
iter_end_(false),
row_allocator_(),
link_type_(DBLINK_DRV_OB),
elapse_time_(0),
sessid_(0),
iterated_rows_(-1)
{}
iterated_rows_(-1),
tm_session_(NULL),
tm_rm_connection_(NULL),
reverse_link_(NULL),
conn_type_(sql::DblinkGetConnType::DBLINK_POOL)
{
}
void ObLinkScanOp::reset()
{
tz_info_ = NULL;
dblink_schema_ = NULL;
reset_result();
reset_stmt();
reset_link_sql();
reset_dblink();
row_allocator_.reset();
}
int ObLinkScanOp::init_dblink(uint64_t dblink_id, ObDbLinkProxy *dblink_proxy)
{
int ret = OB_SUCCESS;
ObSchemaGetterGuard schema_guard;
const ObDbLinkSchema *dblink_schema = NULL;
dblink_param_ctx param_ctx;
ObSQLSessionInfo * my_session = NULL;
my_session = ctx_.get_my_session();
ObPhysicalPlanCtx *plan_ctx = GET_PHY_PLAN_CTX(ctx_);
if (OB_NOT_NULL(dblink_proxy_)) {
ret = OB_INIT_TWICE;
LOG_WARN("link scan ctx already inited", K(ret));
} else if (OB_ISNULL(dblink_proxy)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("dblink_proxy is NULL", K(ret));
} else if (OB_ISNULL(my_session) || OB_ISNULL(plan_ctx)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("my_session or plan_ctx is NULL", K(my_session), K(plan_ctx), K(ret));
} else if (FALSE_IT(sessid_ = my_session->get_sessid())) {
} else if (OB_FAIL(GCTX.schema_service_->get_tenant_schema_guard(tenant_id_, schema_guard))) {
LOG_WARN("failed to get schema guard", K(ret), K(tenant_id_));
} else if (OB_FAIL(schema_guard.get_dblink_schema(tenant_id_, dblink_id, dblink_schema))) {
LOG_WARN("failed to get dblink schema", K(ret), K(tenant_id_), K(dblink_id));
} else if (OB_ISNULL(dblink_schema)) {
ret = OB_DBLINK_NOT_EXIST_TO_ACCESS;
LOG_WARN("dblink schema is NULL", K(ret), K(dblink_id));
} else if (FALSE_IT(set_link_driver_proto(static_cast<DblinkDriverProto>(dblink_schema->get_driver_proto())))) {
// do nothing
} else if (OB_FAIL(ObLinkScanOp::init_dblink_param_ctx(ctx_, param_ctx))) {
LOG_WARN("failed to init dblink param ctx", K(ret));
} else if (OB_FAIL(dblink_proxy->create_dblink_pool(tenant_id_,
dblink_id,
link_type_,
dblink_schema->get_host_addr(),
dblink_schema->get_tenant_name(),
dblink_schema->get_user_name(),
dblink_schema->get_password(),
ObString(""),
dblink_schema->get_conn_string(),
dblink_schema->get_cluster_name(),
param_ctx))) {
LOG_WARN("failed to create dblink pool", K(ret));
} else if (OB_FAIL(dblink_proxy->acquire_dblink(dblink_id, link_type_, dblink_conn_, sessid_, (plan_ctx->get_timeout_timestamp() - ObTimeUtility::current_time()) / 1000000))) {
ObDblinkUtils::process_dblink_errno(link_type_, dblink_conn_, ret);
LOG_WARN("failed to acquire dblink", K(ret), K(dblink_id));
} else if (OB_FAIL(my_session->register_dblink_conn_pool(dblink_conn_->get_common_server_pool()))) {
LOG_WARN("failed to register dblink conn pool to current session", K(ret));
} else if (OB_ISNULL(stmt_buf_ = static_cast<char *>(allocator_.alloc(stmt_buf_len_)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to init stmt buf", K(ret), K(stmt_buf_len_));
} else {
dblink_id_ = dblink_id;
dblink_proxy_ = dblink_proxy;
}
return ret;
}
int ObLinkScanOp::init_tz_info(const ObTimeZoneInfo *tz_info)
{
int ret = OB_SUCCESS;
@ -129,332 +76,152 @@ int ObLinkScanOp::init_tz_info(const ObTimeZoneInfo *tz_info)
return ret;
}
/*
* see these two plans below:
*
* the 1st one is simple scan for remote table, upper operator WILL NOT call rescan().
*
* the 2nd one is subplan filter with rescan for remote table, upper operator WILL call rescan(),
* and the most important thing is we can get valid param_store ONLY after rescan().
*
* A. so we should call read() in inner_open() for 1st case, and in rescan() for 2nd case,
* B. or we should call read() in inner_get_next_row() for both cases?
*
* I have tried plan A, but I found param_count.count() is NOT 0 in both cases, and there is
* nothing else can help make this decision, so I choose plan B now.
*
* explain
* select * from s1 where c1 = concat('a', 'aa');
*
* ======================================
* |ID|OPERATOR |NAME|EST. ROWS|COST |
* --------------------------------------
* |0 |LINK | |0 |113109|
* |1 | TABLE SCAN|T1 |990 |113109|
* ======================================
*
* Outputs & filters:
* -------------------------------------
* 0 - output([T1.PK], [T1.C1], [T1.C2], [T1.C3]), filter(nil), dblink_id=1100611139403793,
* link_stmt=select "T1"."PK", "T1"."C1", "T1"."C2", "T1"."C3" from T1 where ("T1"."C1" = CONCAT('a','aa'))
* 1 - output([T1.C1], [T1.PK], [T1.C2], [T1.C3]), filter([T1.C1 = ?]),
* access([T1.C1], [T1.PK], [T1.C2], [T1.C3]), partitions(p0)
*
* explain
* select * from t1 where pk = (select pk from s2 where c1 = t1.c1);
*
* =========================================
* |ID|OPERATOR |NAME|EST. ROWS|COST |
* -----------------------------------------
* |0 |SUBPLAN FILTER| |1 |221841|
* |1 | TABLE SCAN |T1 |2 |37 |
* |2 | LINK | |0 |110903|
* |3 | TABLE SCAN |T2 |990 |110903|
* =========================================
*
* Outputs & filters:
* -------------------------------------
* 0 - output([T1.PK], [T1.C1], [T1.C2], [T1.C3]), filter([T1.PK = subquery(1)]),
* exec_params_([T1.C1]), onetime_exprs_(nil), init_plan_idxs_(nil)
* 1 - output([T1.C1], [T1.PK], [T1.C2], [T1.C3]), filter(nil),
* access([T1.C1], [T1.PK], [T1.C2], [T1.C3]), partitions(p0)
* 2 - output([T2.PK]), filter(nil), dblink_id=1100611139403794,
* link_stmt=select "T2"."PK" from T2 where ("T2"."C1" = $000)
* 3 - output([T2.PK]), filter([T2.C1 = ?]),
* access([T2.C1], [T2.PK]), partitions(p0)
*
*
*/
int ObLinkScanOp::read(const ObString &link_stmt_fmt,
const ObIArray<ObParamPosIdx> &param_infos,
const ObParamStore &param_store)
int ObLinkScanOp::inner_execute_link_stmt(const char *link_stmt)
{
int ret = OB_SUCCESS;
if (OB_FAIL(combine_link_stmt(link_stmt_fmt, param_infos, param_store))) {
LOG_WARN("failed to gen link stmt", K(ret), K(link_stmt_fmt));
} else if (OB_FAIL(read(stmt_buf_))) {
LOG_WARN("failed to execute link stmt", K(ret));
}
return ret;
}
int ObLinkScanOp::read(const char *link_stmt)
{
int ret = OB_SUCCESS;
int read_ret = OB_SUCCESS;
const char *read_errmsg = NULL;
ObCommonServerConnectionPool *common_server_pool_ptr = NULL;
uint16_t charset_id = 0;
uint16_t ncharset_id = 0;
if (OB_ISNULL(dblink_proxy_) || OB_ISNULL(link_stmt)) {
ObSQLSessionInfo * my_session = NULL;
my_session = ctx_.get_my_session();
transaction::ObTransID tx_id;
bool have_lob = false;
if (OB_ISNULL(link_stmt)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("dblink_proxy or link_stmt is NULL", K(ret), KP(dblink_proxy_), KP(link_stmt));
LOG_WARN("unexpected NULL", K(ret), KP(link_stmt));
} else if (sql::DblinkGetConnType::TM_CONN == conn_type_) {
if (OB_FAIL(tm_rm_connection_->execute_read(OB_INVALID_TENANT_ID, link_stmt, res_))) {
ObDblinkUtils::process_dblink_errno(DblinkDriverProto(tm_rm_connection_->get_dblink_driver_proto()), ret);
LOG_WARN("failed to read table data by tm_rm_connection", K(ret), K(link_stmt), K(DblinkDriverProto(tm_rm_connection_->get_dblink_driver_proto())));
} else {
LOG_DEBUG("succ to read table data by tm_rm_connection", K(link_stmt), K(DblinkDriverProto(tm_rm_connection_->get_dblink_driver_proto())));
}
} else if (sql::DblinkGetConnType::TEMP_CONN == conn_type_) {
if (OB_FAIL(reverse_link_->read(link_stmt, res_))) {
LOG_WARN("failed to read table data by reverse_link", K(ret));
} else {
LOG_DEBUG("succ to read table data by reverse_link");
}
} else if (OB_ISNULL(dblink_proxy_) || OB_ISNULL(my_session)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected NULL", K(ret), KP(dblink_proxy_), KP(my_session));
} else if (OB_FAIL(dblink_proxy_->dblink_read(dblink_conn_, res_, link_stmt))) {
ObDblinkUtils::process_dblink_errno(link_type_, dblink_conn_, ret);
LOG_WARN("read failed", K(ret), K(link_stmt));
}
if (OB_FAIL(ret)) {
// do nothing
} else if (OB_ISNULL(result_ = res_.get_result())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to get result", K(ret));
} else if (OB_FAIL(ObLinkScanOp::get_charset_id(ctx_, charset_id, ncharset_id))) {
} else if (DBLINK_DRV_OCI == link_type_ && ObDblinkService::check_lob_in_result(result_, have_lob)) {
LOG_WARN("failed to check lob result", K(ret));
} else if (have_lob) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("dblink not support lob type", K(ret));
LOG_USER_ERROR(OB_NOT_SUPPORTED, "dblink fetch lob type data");
} else if (OB_FAIL(ObLinkOp::get_charset_id(ctx_, charset_id, ncharset_id))) {
LOG_WARN("failed to get charset id", K(ret));
} else if (OB_FAIL(result_->set_expected_charset_id(charset_id, ncharset_id))) {
LOG_WARN("failed to set result set expected charset", K(ret), K(charset_id), K(ncharset_id));
}
return ret;
}
int ObLinkScanOp::rollback()
{
int ret = OB_SUCCESS;
reset_result();
reset_stmt();
if (OB_ISNULL(dblink_proxy_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("dblink_proxy is NULL", K(ret), KP(dblink_proxy_));
} else if (OB_FAIL(dblink_proxy_->rollback(dblink_conn_))) {
LOG_WARN("failed to rollback", K(ret));
}
return ret;
}
int ObLinkScanOp::combine_link_stmt(const ObString &link_stmt_fmt,
const ObIArray<ObParamPosIdx> &param_infos,
const ObParamStore &param_store)
{
// combine link_stmt_fmt and parameter strings to final link stmt.
int ret = OB_SUCCESS;
int64_t link_stmt_pos = 0;
int64_t next_param = 0;
int64_t stmt_fmt_pos = 0;
int64_t stmt_fmt_next_param_pos = (next_param < param_infos.count() ?
param_infos.at(next_param).pos_ : link_stmt_fmt.length());
while (OB_SUCC(ret) && stmt_fmt_pos < link_stmt_fmt.length()) {
// copy from link_stmt_fmt.
if (stmt_fmt_pos < stmt_fmt_next_param_pos) {
int64_t copy_len = stmt_fmt_next_param_pos - stmt_fmt_pos;
if (link_stmt_pos + copy_len > stmt_buf_len_ &&
OB_FAIL(extend_stmt_buf(link_stmt_pos + copy_len))) {
LOG_WARN("failed to extend stmt buf", K(ret));
}
if (OB_SUCC(ret)) {
MEMCPY(stmt_buf_ + link_stmt_pos, link_stmt_fmt.ptr() + stmt_fmt_pos, copy_len);
link_stmt_pos += copy_len;
stmt_fmt_pos = stmt_fmt_next_param_pos;
}
} else if (stmt_fmt_pos == stmt_fmt_next_param_pos) {
// copy from param_store.
int64_t saved_stmt_pos = link_stmt_pos;
int64_t param_idx = param_infos.at(next_param).idx_;
const ObObjParam &param = param_store.at(param_idx);
ObObjPrintParams obj_print_params = CREATE_OBJ_PRINT_PARAM(ctx_.get_my_session());
if (DBLINK_DRV_OCI == link_type_) {
// Ensure that when oceanbase connects to oracle,
// the target character set of param is the same as that of oci connection.
obj_print_params.cs_type_ = ctx_.get_my_session()->get_nls_collation();
}
obj_print_params.need_cast_expr_ = true;
while (OB_SUCC(ret) && link_stmt_pos == saved_stmt_pos) {
//Previously, the format parameter of the print sql literal function was NULL.
//In the procedure scenario, when dblink reverse spell trunc(date type), it will treat the date type as a string,
//so correct formatting parameter obj_print_params need to be given.
if (OB_FAIL(param.print_sql_literal(stmt_buf_, stmt_buf_len_, link_stmt_pos, obj_print_params))) {
if (ret == OB_SIZE_OVERFLOW) {
ret = OB_SUCCESS;
if (OB_FAIL(extend_stmt_buf())) {
LOG_WARN("failed to extend stmt buf", K(ret), K(param));
} else {
// databuff_printf() will set link_stmt_pos to stmt_buf_len_ - 1,
// so we need load the saved_stmt_pos and retry.
link_stmt_pos = saved_stmt_pos;
}
} else {
LOG_WARN("failed to print param", K(ret), K(param));
}
} else {
next_param++;
stmt_fmt_pos += ObLinkStmtParam::get_param_len();
stmt_fmt_next_param_pos = (next_param < param_infos.count() ?
param_infos.at(next_param).pos_ : link_stmt_fmt.length());
}
} // while
} else {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("fmt_pos should not be greater than fmt_next_param_pos", K(ret),
K(stmt_fmt_pos), K(stmt_fmt_next_param_pos));
}
}
if (OB_SUCC(ret)) {
stmt_buf_[link_stmt_pos++] = 0;
}
return ret;
}
int ObLinkScanOp::extend_stmt_buf(int64_t need_size)
{
int ret = OB_SUCCESS;
int64_t alloc_size = (need_size > stmt_buf_len_) ?
(need_size / STMT_BUF_BLOCK + 1) * STMT_BUF_BLOCK :
stmt_buf_len_ + STMT_BUF_BLOCK;
char *alloc_buf = static_cast<char *>(allocator_.alloc(alloc_size));
if (OB_ISNULL(alloc_buf)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to extend stmt buf", K(ret), K(alloc_size));
} else {
MEMCPY(alloc_buf, stmt_buf_, stmt_buf_len_);
allocator_.free(stmt_buf_);
stmt_buf_ = alloc_buf;
stmt_buf_len_ = alloc_size;
LOG_DEBUG("succ to dblink read", K(link_stmt), KP(dblink_conn_));
}
return ret;
}
void ObLinkScanOp::reset_dblink()
{
if (OB_NOT_NULL(dblink_proxy_) && OB_NOT_NULL(dblink_conn_)) {
dblink_proxy_->release_dblink(link_type_, dblink_conn_, sessid_);
int tmp_ret = OB_SUCCESS;
if (OB_NOT_NULL(dblink_proxy_) && OB_NOT_NULL(dblink_conn_) && !in_xa_trascaction_ &&
OB_SUCCESS != (tmp_ret = dblink_proxy_->release_dblink(link_type_, dblink_conn_, sessid_))) {
LOG_WARN("failed to release connection", K(tmp_ret));
}
if (OB_NOT_NULL(reverse_link_)) {
reverse_link_->close();
}
tenant_id_ = OB_INVALID_ID;
dblink_id_ = OB_INVALID_ID;
dblink_proxy_ = NULL;
dblink_conn_ = NULL;
tm_rm_connection_ = NULL;
reverse_link_ = NULL;
sessid_ = 0;
}
void ObLinkScanOp::reset_stmt()
{
if (OB_NOT_NULL(stmt_buf_)) {
stmt_buf_[0] = 0;
}
conn_type_ = sql::DblinkGetConnType::DBLINK_POOL;
}
void ObLinkScanOp::reset_result()
{
if (OB_NOT_NULL(result_)) {
if (DBLINK_DRV_OB == link_type_) {
result_->close();
int tmp_ret = OB_SUCCESS;
if (OB_SUCCESS != (tmp_ret = result_->close())) {
LOG_WARN("failed to close result", K(tmp_ret));
}
}
result_ = NULL;
res_.reset();
}
}
OB_DEF_SERIALIZE(ObLinkScanSpec)
{
int ret = OB_SUCCESS;
BASE_SER((ObLinkScanSpec, ObOpSpec));
LST_DO_CODE(OB_UNIS_ENCODE,
param_infos_,
stmt_fmt_,
dblink_id_);
return ret;
}
OB_DEF_DESERIALIZE(ObLinkScanSpec)
{
int ret = OB_SUCCESS;
BASE_DESER((ObLinkScanSpec, ObOpSpec));
LST_DO_CODE(OB_UNIS_DECODE,
param_infos_,
stmt_fmt_,
dblink_id_);
if (OB_FAIL(ret)) {
} else if (FALSE_IT(stmt_fmt_len_ = stmt_fmt_.length())) {
// nothing.
} else if (OB_ISNULL(stmt_fmt_buf_ = static_cast<char *>(allocator_.alloc(stmt_fmt_len_)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to alloc stmt_fmt_buf", K(ret), K(stmt_fmt_len_));
} else {
MEMCPY(stmt_fmt_buf_, stmt_fmt_.ptr(), stmt_fmt_len_);
stmt_fmt_.assign(stmt_fmt_buf_, static_cast<int32_t>(stmt_fmt_len_));
}
return ret;
}
OB_DEF_SERIALIZE_SIZE(ObLinkScanSpec)
{
int64_t len = 0;
BASE_ADD_LEN((ObLinkScanSpec, ObOpSpec));
LST_DO_CODE(OB_UNIS_ADD_LEN,
param_infos_,
stmt_fmt_,
dblink_id_);
return len;
}
ObLinkScanSpec::ObLinkScanSpec(common::ObIAllocator &alloc, const ObPhyOperatorType type)
: ObOpSpec(alloc, type),
allocator_(alloc),
param_infos_(alloc),
stmt_fmt_(),
stmt_fmt_buf_(NULL),
stmt_fmt_len_(0),
dblink_id_(OB_INVALID_ID)
{}
int ObLinkScanSpec::set_stmt_fmt(const char *stmt_fmt_buf, int64_t stmt_fmt_len)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(stmt_fmt_buf) || stmt_fmt_len <= 0) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("stmt_fmt_buf is null or stmt_fmt_len is less than 0",
K(ret), KP(stmt_fmt_buf), K(stmt_fmt_len));
} else if (OB_ISNULL(stmt_fmt_buf_ = static_cast<char *>(allocator_.alloc(stmt_fmt_len)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to alloc stmt_fmt_buf", K(ret), K(stmt_fmt_len));
} else {
MEMCPY(stmt_fmt_buf_, stmt_fmt_buf, stmt_fmt_len);
stmt_fmt_len_ = stmt_fmt_len;
stmt_fmt_.assign(stmt_fmt_buf_, static_cast<int32_t>(stmt_fmt_len_));
}
return ret;
}
int ObLinkScanSpec::set_param_infos(const ObIArray<ObParamPosIdx> &param_infos)
{
int ret = OB_SUCCESS;
param_infos_.reset();
if (param_infos.count() > 0 && OB_FAIL(param_infos_.init(param_infos.count()))) {
LOG_WARN("failed to init fixed array", K(param_infos.count()));
}
for (int64_t i = 0; OB_SUCC(ret) && i < param_infos.count(); i++) {
if (OB_FAIL(param_infos_.push_back(param_infos.at(i)))) {
LOG_WARN("failed to push back param info", K(ret), K(param_infos.at(i)));
}
}
return ret;
}
int ObLinkScanOp::inner_open()
{
int ret = OB_SUCCESS;
ObSQLSessionInfo *session = ctx_.get_my_session();
ObPhysicalPlanCtx *plan_ctx = ctx_.get_physical_plan_ctx();
const ObPhysicalPlan *phy_plan = MY_SPEC.get_phy_plan();
int64_t tm_sessid = -1;
reverse_link_ = NULL;
stmt_buf_len_ += head_comment_length_;
if (NULL != phy_plan) {
tm_sessid = phy_plan->tm_sessid_;
}
if (OB_ISNULL(session) || OB_ISNULL(plan_ctx)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("session or plan_ctx is NULL", K(ret), KP(session), KP(plan_ctx));
} else if (OB_FAIL(set_next_sql_req_level())) {
LOG_WARN("failed to set next sql req level", K(ret));
} else if (FALSE_IT(tenant_id_ = session->get_effective_tenant_id())) {
} else if (OB_FAIL(init_dblink(MY_SPEC.dblink_id_, GCTX.dblink_proxy_))) {
LOG_WARN("failed to init dblink", K(ret), K(MY_SPEC.dblink_id_));
} else if (MY_SPEC.is_reverse_link_) {
// RM process sql within @! and @xxxx! send by TM
LOG_DEBUG("link scan op, RM process sql within @! and @xxxx! send by TM");
conn_type_ = sql::DblinkGetConnType::TEMP_CONN;
ObPhysicalPlanCtx *plan_ctx = GET_PHY_PLAN_CTX(ctx_);
if (OB_ISNULL(plan_ctx)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to get plan ctx", K(ret));
} else if (OB_FAIL(session->get_dblink_context().get_reverse_link(reverse_link_))) {
LOG_WARN("fail to get reverse_link", K(ret));
} else if (NULL == reverse_link_) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("reverse_link_ is NULL", K(ret), KP(reverse_link_), K(session->get_sessid()));
} else if (OB_FAIL(reverse_link_->open(next_sql_req_level_))) {
LOG_WARN("failed to open reverse_link", K(ret));
}
} else if (-1 != tm_sessid) { // TM process sql within @xxxx send by RM
LOG_DEBUG("link scan op, TM process sql within @xxxx send by RM", K(tm_sessid));
sql::ObSQLSessionMgr *session_mgr = GCTX.session_mgr_;
if (OB_ISNULL(session_mgr)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected null", K(ret), KP(session_mgr));
} else if (OB_FAIL(session_mgr->get_session(static_cast<uint32_t>(tm_sessid), tm_session_))) {
LOG_WARN("failed to get session", K(ret), K(tm_sessid));
} else {
if (NULL != tm_session_ &&
tm_session_->get_dblink_context().get_dblink_conn(MY_SPEC.dblink_id_, tm_rm_connection_)) {
LOG_WARN("failed to get dblink connection from session", KP(tm_session_), K(ret));
} else if (NULL != tm_rm_connection_){
conn_type_ = sql::DblinkGetConnType::TM_CONN;
LOG_DEBUG("get tm sesseion and connection", KP(tm_session_), KP(tm_rm_connection_));
}
session_mgr->revert_session(tm_session_);
tm_session_ = NULL;
}
}
if (OB_FAIL(ret)) {
// do nothing
} else if (sql::DblinkGetConnType::DBLINK_POOL == conn_type_ &&
OB_FAIL(init_dblink(MY_SPEC.dblink_id_, GCTX.dblink_proxy_))) {
LOG_WARN("failed to init dblink", K(ret), K(MY_SPEC.dblink_id_), K(MY_SPEC.is_reverse_link_));
} else if (OB_FAIL(init_tz_info(TZ_INFO(session)))) {
LOG_WARN("failed to tz info", K(ret), KP(session));
} else {
@ -462,6 +229,15 @@ int ObLinkScanOp::inner_open()
row_allocator_.set_label("linkoprow");
row_allocator_.set_ctx_id(ObCtxIds::WORK_AREA);
}
if (OB_SUCC(ret) && OB_ISNULL(stmt_buf_ = static_cast<char *>(allocator_.alloc(stmt_buf_len_)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to init stmt buf", K(ret), K(stmt_buf_len_));
}
// close reverse_link
if (NULL != reverse_link_ && OB_FAIL(ret)) {
reverse_link_->close();
LOG_DEBUG("close reverse link", KP(reverse_link_), K(ret));
}
return ret;
}
@ -502,7 +278,11 @@ int ObLinkScanOp::inner_get_next_row()
if (OB_ISNULL(plan_ctx)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to get plan ctx", K(ret));
} else if (need_read() && OB_FAIL(read(stmt_fmt, param_infos, plan_ctx->get_param_store()))) {
} else if (need_read() &&
OB_FAIL(execute_link_stmt(stmt_fmt,
param_infos,
plan_ctx->get_param_store(),
reverse_link_))) {
LOG_WARN("failed to execute link stmt", K(ret), K(stmt_fmt), K(param_infos));
} else if (OB_ISNULL(result_)) {
ret = OB_NOT_INIT;
@ -550,7 +330,6 @@ int ObLinkScanOp::inner_get_next_row()
}
}
}
return ret;
}
@ -558,18 +337,14 @@ int ObLinkScanOp::inner_get_next_row()
int ObLinkScanOp::inner_close()
{
int ret = OB_SUCCESS;
if (OB_FAIL(rollback())) {
LOG_WARN("failed to execute rollback", K(ret));
} else {
reset();
}
reset();
return ret;
}
int ObLinkScanOp::inner_rescan()
{
reset_result();
reset_stmt();
reset_link_sql();
iter_end_ = false;
iterated_rows_ = -1;
return ObOperator::inner_rescan();
@ -633,46 +408,5 @@ int ObLinkScanOp::inner_get_next_batch(const int64_t max_row_cnt)
return ret;
}
int ObLinkScanOp::init_dblink_param_ctx(ObExecContext &exec_ctx, dblink_param_ctx &param_ctx)
{
int ret = OB_SUCCESS;
uint16_t charset_id = 0;
uint16_t ncharset_id = 0;
if (OB_FAIL(get_charset_id(exec_ctx, charset_id, ncharset_id))) {
LOG_WARN("failed to get session charset id", K(ret));
} else {
param_ctx.charset_id_ = charset_id;
param_ctx.ncharset_id_ = ncharset_id;
param_ctx.pool_type_ = DblinkPoolType::DBLINK_POOL_DEF;
}
return ret;
}
int ObLinkScanOp::get_charset_id(ObExecContext &exec_ctx,
uint16_t &charset_id, uint16_t &ncharset_id)
{
int ret = OB_SUCCESS;
ObSQLSessionInfo *sess_info = NULL;
if (OB_ISNULL(sess_info = exec_ctx.get_my_session())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("null session info", K(ret));
} else {
ObCollationType coll_type = sess_info->get_nls_collation();
ObCollationType ncoll_type = sess_info->get_nls_collation_nation();
ObCharsetType cs_type = ObCharset::charset_type_by_coll(coll_type);
ObCharsetType ncs_type = ObCharset::charset_type_by_coll(ncoll_type);
if (CHARSET_INVALID == cs_type || CHARSET_INVALID == ncs_type) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to get charset id", K(ret), K(coll_type));
} else {
charset_id = static_cast<uint16_t>(ObCharset::charset_type_to_ora_charset_id(cs_type));
ncharset_id = static_cast<uint16_t>(ObCharset::charset_type_to_ora_charset_id(ncs_type));
LOG_DEBUG("get charset id", K(ret), K(charset_id), K(ncharset_id),
K(cs_type), K(ncs_type), K(coll_type), K(ncoll_type));
}
}
return ret;
}
} // end namespace sql
} // end namespace oceanbase

View File

@ -1,55 +1,32 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
// Copyright 2021 Alibaba Inc. All Rights Reserved.
// Author:
// shanting <dachuan.sdc@antgroup.com>
#ifndef OCEANBASE_SQL_ENGINE_LINK_SCAN_OP_H_
#define OCEANBASE_SQL_ENGINE_LINK_SCAN_OP_H_
#include "sql/engine/ob_operator.h"
#include "sql/ob_sql_utils.h"
#include "lib/mysqlclient/ob_mysql_result.h"
#include "sql/engine/dml/ob_link_op.h"
namespace oceanbase
{
namespace sql
{
class ObLinkScanSpec : public ObOpSpec
class ObLinkScanSpec : public ObLinkSpec
{
OB_UNIS_VERSION_V(1);
public:
explicit ObLinkScanSpec(common::ObIAllocator &alloc, const ObPhyOperatorType type);
int set_param_infos(const common::ObIArray<ObParamPosIdx> &param_infos);
int set_stmt_fmt(const char *stmt_fmt_buf, int64_t stmt_fmt_len);
common::ObIAllocator &allocator_;
common::ObFixedArray<ObParamPosIdx, common::ObIAllocator> param_infos_;
common::ObString stmt_fmt_;
char *stmt_fmt_buf_;
int64_t stmt_fmt_len_;
uint64_t dblink_id_;
bool has_for_update_;
};
class ObLinkScanOp : public ObOperator
class ObLinkScanOp : public ObLinkOp
{
public:
static constexpr int64_t CHECK_STATUS_ROWS_INTERVAL = 1 << 10;
typedef common::ParamStore ObParamStore;
explicit ObLinkScanOp(ObExecContext &exec_ctx, const ObOpSpec &spec, ObOpInput *input);
virtual ~ObLinkScanOp() { destroy(); }
virtual void destroy() { reset(); }
virtual int inner_open() override;
virtual int inner_get_next_row() override;
@ -57,49 +34,29 @@ public:
virtual int inner_rescan() override;
virtual int inner_get_next_batch(const int64_t max_row_cnt) override;
inline void set_link_driver_proto(common::sqlclient::DblinkDriverProto type) { link_type_ = type; }
void reset();
int init_dblink(uint64_t dblink_id, common::ObDbLinkProxy *dblink_proxy);
virtual void reset();
int init_tz_info(const common::ObTimeZoneInfo *tz_info);
int read(const common::ObString &link_stmt_fmt,
const common::ObIArray<ObParamPosIdx> &param_infos,
const ObParamStore &param_store);
bool need_read() const { return OB_ISNULL(result_); }
int read(const char *link_stmt);
int rollback();
int inner_execute_link_stmt(const char *link_stmt);
int get_next(const ObNewRow *&row);
void reset_inner();
static int init_dblink_param_ctx(ObExecContext &exec_ctx, common::sqlclient::dblink_param_ctx &param_ctx);
static int get_charset_id(ObExecContext &exec_ctx, uint16_t &charset_id, uint16_t &ncharset_id);
private:
int combine_link_stmt(const common::ObString &link_stmt_fmt,
const common::ObIArray<ObParamPosIdx> &param_infos,
const ObParamStore &param_store);
int extend_stmt_buf(int64_t need_size = 0);
void reset_dblink();
void reset_stmt();
virtual void reset_dblink() override;
void reset_result();
private:
uint64_t tenant_id_;
uint64_t dblink_id_;
common::ObDbLinkProxy *dblink_proxy_;
common::sqlclient::ObISQLConnection *dblink_conn_;
common::ObMySQLProxy::MySQLResult res_;
common::sqlclient::ObMySQLResult *result_;
common::ObIAllocator &allocator_;
const common::ObTimeZoneInfo *tz_info_;
char *stmt_buf_;
int64_t stmt_buf_len_;
bool iter_end_;
common::ObArenaAllocator row_allocator_;
static const int64_t STMT_BUF_BLOCK;
common::sqlclient::DblinkDriverProto link_type_;
int64_t elapse_time_;
uint32_t sessid_;
int64_t iterated_rows_;
ObSQLSessionInfo *tm_session_;
common::sqlclient::ObISQLConnection *tm_rm_connection_;
ObReverseLink *reverse_link_;
sql::DblinkGetConnType conn_type_;
};
} // end namespace sql
} // end namespace oceanbase
#endif /* OCEANBASE_SQL_ENGINE_LINK_SCAN_ */
#endif /* OCEANBASE_SQL_ENGINE_LINK_SCAN_ */