[FEAT MERGE]sql compat feature
Co-authored-by: Zach41 <zach_41@163.com>
This commit is contained in:
@ -129,7 +129,13 @@ int ObLinkOp::init_dblink(uint64_t dblink_id, ObDbLinkProxy *dblink_proxy, bool
|
||||
LOG_WARN("dblink schema is NULL", K(ret), K(dblink_id));
|
||||
} else if (FALSE_IT(set_link_driver_proto(static_cast<DblinkDriverProto>(dblink_schema_->get_driver_proto())))) {
|
||||
// do nothing
|
||||
} else if (OB_FAIL(ObLinkOp::init_dblink_param_ctx(param_ctx))) {
|
||||
} else if (OB_FAIL(ObLinkOp::init_dblink_param_ctx(ctx_,
|
||||
param_ctx,
|
||||
link_type_,
|
||||
tenant_id_,
|
||||
dblink_id_,
|
||||
sessid_,
|
||||
next_sql_req_level_))) {
|
||||
LOG_WARN("failed to init dblink param ctx", K(ret));
|
||||
} else if (OB_FAIL(dblink_proxy->create_dblink_pool(param_ctx,
|
||||
dblink_schema_->get_host_addr(),
|
||||
@ -356,22 +362,28 @@ int ObLinkSpec::set_param_infos(const ObIArray<ObParamPosIdx> ¶m_infos)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLinkOp::init_dblink_param_ctx(dblink_param_ctx ¶m_ctx)
|
||||
int ObLinkOp::init_dblink_param_ctx(ObExecContext &exec_ctx,
|
||||
common::sqlclient::dblink_param_ctx ¶m_ctx,
|
||||
common::sqlclient::DblinkDriverProto link_type,
|
||||
uint64_t tenant_id,
|
||||
uint64_t dblink_id,
|
||||
uint32_t session_id,
|
||||
int64_t next_sql_req_level)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
uint16_t charset_id = 0;
|
||||
uint16_t ncharset_id = 0;
|
||||
if (OB_FAIL(get_charset_id(ctx_, charset_id, ncharset_id))) {
|
||||
if (OB_FAIL(get_charset_id(exec_ctx, charset_id, ncharset_id))) {
|
||||
LOG_WARN("failed to get session charset id", K(ret));
|
||||
} else {
|
||||
param_ctx.charset_id_ = charset_id;
|
||||
param_ctx.ncharset_id_ = ncharset_id;
|
||||
param_ctx.pool_type_ = DblinkPoolType::DBLINK_POOL_DEF;
|
||||
param_ctx.tenant_id_ = tenant_id_;
|
||||
param_ctx.dblink_id_ = dblink_id_;
|
||||
param_ctx.link_type_ = link_type_;
|
||||
param_ctx.sessid_ = sessid_;
|
||||
param_ctx.sql_request_level_ = next_sql_req_level_;
|
||||
param_ctx.tenant_id_ = tenant_id;
|
||||
param_ctx.dblink_id_ = dblink_id;
|
||||
param_ctx.link_type_ = link_type;
|
||||
param_ctx.sessid_ = session_id;
|
||||
param_ctx.sql_request_level_ = next_sql_req_level;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -48,7 +48,13 @@ public:
|
||||
const ObParamStore ¶m_store,
|
||||
ObReverseLink *reverse_link = NULL);
|
||||
virtual int inner_execute_link_stmt(const char *link_stmt) = 0;
|
||||
int init_dblink_param_ctx(common::sqlclient::dblink_param_ctx ¶m_ctx);
|
||||
static int init_dblink_param_ctx(ObExecContext &exec_ctx,
|
||||
common::sqlclient::dblink_param_ctx ¶m_ctx,
|
||||
common::sqlclient::DblinkDriverProto link_type,
|
||||
uint64_t tenant_id,
|
||||
uint64_t dblink_id,
|
||||
uint32_t session_id,
|
||||
int64_t next_sql_req_level);
|
||||
static int get_charset_id(ObExecContext &exec_ctx, uint16_t &charset_id, uint16_t &ncharset_id);
|
||||
protected:
|
||||
int combine_link_stmt(const common::ObString &link_stmt_fmt,
|
||||
|
||||
@ -22,6 +22,7 @@ namespace oceanbase
|
||||
using namespace common;
|
||||
using namespace share;
|
||||
using namespace share::schema;
|
||||
using namespace common::sqlclient;
|
||||
namespace sql
|
||||
{
|
||||
|
||||
@ -40,7 +41,7 @@ int ObSequenceSpec::add_uniq_nextval_sequence_id(uint64_t seq_id)
|
||||
if (seq_id == nextval_seq_ids_.at(i)) {
|
||||
ret = OB_ENTRY_EXIST;
|
||||
LOG_WARN("should not add duplicated seq id to ObSequence operator",
|
||||
K(seq_id), K(ret));
|
||||
K(seq_id), K(ret));
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
@ -51,9 +52,9 @@ int ObSequenceSpec::add_uniq_nextval_sequence_id(uint64_t seq_id)
|
||||
return ret;
|
||||
}
|
||||
|
||||
ObSequenceOp::ObSequenceOp(ObExecContext &exec_ctx, const ObOpSpec &spec, ObOpInput *input)
|
||||
: ObOperator(exec_ctx, spec, input),
|
||||
sequence_cache_(nullptr)
|
||||
ObLocalSequenceExecutor::ObLocalSequenceExecutor()
|
||||
:ObSequenceExecutor(),
|
||||
sequence_cache_(nullptr)
|
||||
{
|
||||
sequence_cache_ = &share::ObSequenceCache::get_instance();
|
||||
if (OB_ISNULL(sequence_cache_)) {
|
||||
@ -61,60 +62,35 @@ ObSequenceOp::ObSequenceOp(ObExecContext &exec_ctx, const ObOpSpec &spec, ObOpIn
|
||||
}
|
||||
}
|
||||
|
||||
ObSequenceOp::~ObSequenceOp()
|
||||
ObLocalSequenceExecutor::~ObLocalSequenceExecutor()
|
||||
{
|
||||
destroy();
|
||||
}
|
||||
|
||||
int ObSequenceOp::inner_open()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(init_op())) {
|
||||
LOG_WARN("initialize operator context failed", K(ret));
|
||||
} else if (OB_ISNULL(sequence_cache_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("seq cache not init", K(ret));
|
||||
} else if (get_child_cnt() > 1) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("should not have more than 1 child", K(ret));
|
||||
} else if (0 < MY_SPEC.filters_.count()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("sequence operator should have no filter expr", K(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObSequenceOp::inner_close()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
reset();
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObSequenceOp::init_op()
|
||||
int ObLocalSequenceExecutor::init(ObExecContext &ctx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTaskExecutorCtx *task_ctx = NULL;
|
||||
share::schema::ObMultiVersionSchemaService *schema_service = NULL;
|
||||
ObSQLSessionInfo *my_session = NULL;
|
||||
share::schema::ObSchemaGetterGuard schema_guard;
|
||||
if (OB_ISNULL(my_session = GET_MY_SESSION(ctx_))) {
|
||||
if (OB_ISNULL(my_session = GET_MY_SESSION(ctx))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("fail to get my session", K(ret));
|
||||
} else if (OB_ISNULL(task_ctx = GET_TASK_EXECUTOR_CTX(ctx_))) {
|
||||
} else if (OB_ISNULL(task_ctx = GET_TASK_EXECUTOR_CTX(ctx))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("task executor ctx is null", K(ret));
|
||||
} else if (OB_ISNULL(schema_service = task_ctx->schema_service_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("schema service is null", K(ret));
|
||||
} else if (OB_FAIL(schema_service->get_tenant_schema_guard(
|
||||
my_session->get_effective_tenant_id(),
|
||||
schema_guard))) {
|
||||
my_session->get_effective_tenant_id(),
|
||||
schema_guard))) {
|
||||
LOG_WARN("get schema guard failed", K(ret));
|
||||
} else {
|
||||
uint64_t tenant_id = my_session->get_effective_tenant_id();
|
||||
const ObIArray<uint64_t> &ids = MY_SPEC.nextval_seq_ids_;
|
||||
ARRAY_FOREACH_X(ids, idx, cnt, OB_SUCC(ret)) {
|
||||
const uint64_t seq_id = ids.at(idx);
|
||||
ARRAY_FOREACH_X(seq_ids_, idx, cnt, OB_SUCC(ret)) {
|
||||
const uint64_t seq_id = seq_ids_.at(idx);
|
||||
const ObSequenceSchema *seq_schema = nullptr;
|
||||
if (OB_FAIL(schema_guard.get_sequence_schema(
|
||||
tenant_id,
|
||||
@ -134,17 +110,24 @@ int ObSequenceOp::init_op()
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObSequenceOp::inner_get_next_row()
|
||||
void ObLocalSequenceExecutor::reset()
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
void ObLocalSequenceExecutor::destroy()
|
||||
{
|
||||
sequence_cache_ = NULL;
|
||||
}
|
||||
|
||||
int ObLocalSequenceExecutor::get_nextval(ObExecContext &ctx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObSQLSessionInfo *my_session = GET_MY_SESSION(ctx_);
|
||||
const ObIArray<uint64_t> &ids = MY_SPEC.nextval_seq_ids_;
|
||||
if (OB_FAIL(try_get_next_row())) {
|
||||
LOG_WARN_IGNORE_ITER_END(ret, "fail get next row", K(ret));
|
||||
} else if (ids.count() != seq_schemas_.count()) {
|
||||
ObSQLSessionInfo *my_session = GET_MY_SESSION(ctx);
|
||||
if (seq_ids_.count() != seq_schemas_.count()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("id count does not match schema count",
|
||||
"id_cnt", ids.count(),
|
||||
"id_cnt", seq_ids_.count(),
|
||||
"schema_cnt", seq_schemas_.count(),
|
||||
K(ret));
|
||||
} else {
|
||||
@ -152,16 +135,15 @@ int ObSequenceOp::inner_get_next_row()
|
||||
ObArenaAllocator allocator; // nextval 临时计算内存
|
||||
// 当且仅当 select item 中有 nextval 时才需要去 cache 中更新 nextval
|
||||
// 否则直接取用 session 中的值
|
||||
ARRAY_FOREACH_X(ids, idx, cnt, OB_SUCC(ret)) {
|
||||
const uint64_t seq_id = ids.at(idx);
|
||||
ARRAY_FOREACH_X(seq_ids_, idx, cnt, OB_SUCC(ret)) {
|
||||
const uint64_t seq_id = seq_ids_.at(idx);
|
||||
// int64_t dummy_seq_value = 10240012435; // TODO: xiaochu, 设置 number 到 session 中
|
||||
ObSequenceValue seq_value;
|
||||
// 注意:这里 schema 的顺序和 ids 里面 id 的顺序是一一对应的
|
||||
// 所以可以直接用下标来寻址
|
||||
if (OB_FAIL(sequence_cache_->nextval(
|
||||
seq_schemas_.at(idx),
|
||||
allocator,
|
||||
seq_value))) {
|
||||
if (OB_FAIL(sequence_cache_->nextval(seq_schemas_.at(idx),
|
||||
allocator,
|
||||
seq_value))) {
|
||||
LOG_WARN("fail get nextval for seq", K(tenant_id), K(seq_id), K(ret));
|
||||
} else if (OB_FAIL(my_session->set_sequence_value(tenant_id, seq_id, seq_value))) {
|
||||
LOG_WARN("save seq_value to session as currval for later read fail",
|
||||
@ -172,6 +154,337 @@ int ObSequenceOp::inner_get_next_row()
|
||||
return ret;
|
||||
}
|
||||
|
||||
ObRemoteSequenceExecutor::ObRemoteSequenceExecutor()
|
||||
:ObSequenceExecutor(),
|
||||
sessid_(0),
|
||||
link_type_(DBLINK_UNKNOWN),
|
||||
format_sql_(NULL),
|
||||
format_sql_length_(0),
|
||||
dblink_conn_(NULL)
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
ObRemoteSequenceExecutor::~ObRemoteSequenceExecutor()
|
||||
{
|
||||
destroy();
|
||||
}
|
||||
|
||||
int ObRemoteSequenceExecutor::init(ObExecContext &ctx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObSQLSessionInfo *my_session = NULL;
|
||||
if (OB_ISNULL(my_session = GET_MY_SESSION(ctx))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("fail to get my session", K(ret));
|
||||
}
|
||||
ARRAY_FOREACH_X(seq_ids_, idx, cnt, OB_SUCC(ret)) {
|
||||
const uint64_t seq_id = seq_ids_.at(idx);
|
||||
const ObSequenceSchema *seq_schema = nullptr;
|
||||
if (OB_FAIL(my_session->get_dblink_sequence_schema(seq_id, seq_schema))) {
|
||||
LOG_WARN("failed to get dblink sequence schema", K(ret));
|
||||
} else if (OB_ISNULL(seq_schema)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("null unexpected", K(ret));
|
||||
} else if (OB_FAIL(seq_schemas_.push_back(*seq_schema))) {
|
||||
// 注意:这里将 schema 缓存到数组里,会自动深拷贝 sequence name
|
||||
// 即使 schema guard 释放,sequence name 的内存也还有效,直到请求结束
|
||||
LOG_WARN("cache seq_schema fail", K(seq_id), K(ret));
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(init_dblink_connection(ctx))) {
|
||||
LOG_WARN("failed to init dblink connection", K(ret));
|
||||
} else if (OB_FAIL(init_sequence_sql(ctx))) {
|
||||
LOG_WARN("failed to init sequence sql", K(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObRemoteSequenceExecutor::init_dblink_connection(ObExecContext &ctx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObSQLSessionInfo * my_session = ctx.get_my_session();
|
||||
common::sqlclient::ObISQLConnection *dblink_conn = NULL;
|
||||
ObPhysicalPlanCtx *plan_ctx = GET_PHY_PLAN_CTX(ctx);
|
||||
ObDbLinkProxy *dblink_proxy = GCTX.dblink_proxy_;
|
||||
const ObDbLinkSchema *dblink_schema = NULL;
|
||||
uint64_t tenant_id = OB_INVALID_ID;
|
||||
ObSchemaGetterGuard schema_guard;
|
||||
dblink_param_ctx param_ctx;
|
||||
if (OB_ISNULL(dblink_proxy)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("dblink_proxy is NULL", K(ret));
|
||||
} else if (OB_ISNULL(my_session) || OB_ISNULL(plan_ctx)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("my_session or plan_ctx is NULL", K(my_session), K(plan_ctx), K(ret));
|
||||
} else if (FALSE_IT(sessid_ = my_session->get_sessid())) {
|
||||
} else if (FALSE_IT(tenant_id = my_session->get_effective_tenant_id())) {
|
||||
} else if (OB_FAIL(GCTX.schema_service_->get_tenant_schema_guard(tenant_id, schema_guard))) {
|
||||
LOG_WARN("failed to get schema guard", K(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(schema_guard.get_dblink_schema(tenant_id, dblink_id_, dblink_schema))) {
|
||||
LOG_WARN("failed to get dblink schema", K(ret), K(tenant_id), K(dblink_id_));
|
||||
} else if (OB_ISNULL(dblink_schema)) {
|
||||
ret = OB_DBLINK_NOT_EXIST_TO_ACCESS;
|
||||
LOG_WARN("dblink schema is NULL", K(ret), K(dblink_id_));
|
||||
} else if (FALSE_IT(link_type_ = static_cast<DblinkDriverProto>(dblink_schema->get_driver_proto()))) {
|
||||
// do nothing
|
||||
} else if (OB_FAIL(ObLinkOp::init_dblink_param_ctx(ctx,
|
||||
param_ctx,
|
||||
link_type_,
|
||||
tenant_id,
|
||||
dblink_id_,
|
||||
sessid_,
|
||||
my_session->get_next_sql_request_level()))) {
|
||||
LOG_WARN("failed to init dblink param ctx", K(ret));
|
||||
} else if (OB_FAIL(dblink_proxy->create_dblink_pool(param_ctx,
|
||||
dblink_schema->get_host_addr(),
|
||||
dblink_schema->get_tenant_name(),
|
||||
dblink_schema->get_user_name(),
|
||||
dblink_schema->get_plain_password(),
|
||||
dblink_schema->get_database_name(),
|
||||
dblink_schema->get_conn_string(),
|
||||
dblink_schema->get_cluster_name()))) {
|
||||
LOG_WARN("failed to create dblink pool", K(ret));
|
||||
} else if (OB_FAIL(ObDblinkService::get_local_session_vars(my_session, ctx.get_allocator(), param_ctx))) {
|
||||
LOG_WARN("failed to get local session vars", K(ret));
|
||||
} else if (OB_FAIL(dblink_proxy->acquire_dblink(param_ctx,
|
||||
dblink_conn_))) {
|
||||
LOG_WARN("failed to acquire dblink", K(ret), K(dblink_id_));
|
||||
} else if (OB_FAIL(my_session->get_dblink_context().register_dblink_conn_pool(dblink_conn_->get_common_server_pool()))) {
|
||||
LOG_WARN("failed to register dblink conn pool to current session", K(ret));
|
||||
} else {
|
||||
LOG_TRACE("link op get connection from dblink pool", KP(dblink_conn_), K(lbt()));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObRemoteSequenceExecutor::init_sequence_sql(ObExecContext &ctx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObSqlString sql;
|
||||
if (OB_FAIL(sql.append("SELECT "))) {
|
||||
LOG_WARN("failed to append string", K(ret));
|
||||
}
|
||||
for (uint64_t i = 0; OB_SUCC(ret) && i < seq_schemas_.count(); ++i) {
|
||||
//const ObSequenceSchema *seq_schema = nullptr;
|
||||
if (OB_FAIL(sql.append_fmt(" %.*s.NEXTVAL ",
|
||||
seq_schemas_.at(i).get_sequence_name().length(),
|
||||
seq_schemas_.at(i).get_sequence_name().ptr()))) {
|
||||
LOG_WARN("failed to append string", K(ret));
|
||||
} else if (i == seq_ids_.count() - 1) {
|
||||
//do nothing
|
||||
} else if (OB_FAIL(sql.append(", "))) {
|
||||
LOG_WARN("failed to append string", K(ret));
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(sql.append(" FROM DUAL"))) {
|
||||
LOG_WARN("failed to append string", K(ret));
|
||||
} else if (OB_FALSE_IT(format_sql_length_ = sql.length() + 1)) {
|
||||
} else if (OB_ISNULL(format_sql_ = static_cast<char*>(ctx.get_allocator().alloc(format_sql_length_)))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("failed to alloc memory", K(ret));
|
||||
} else {
|
||||
MEMSET(format_sql_, 0, format_sql_length_);
|
||||
MEMCPY(format_sql_, sql.ptr(), sql.length());
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObRemoteSequenceExecutor::reset()
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
void ObRemoteSequenceExecutor::destroy()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
#ifdef OB_BUILD_DBLINK
|
||||
if (DBLINK_DRV_OCI == link_type_ &&
|
||||
NULL != dblink_conn_ &&
|
||||
OB_FAIL(static_cast<ObOciConnection *>(dblink_conn_)->free_oci_stmt())) {
|
||||
LOG_WARN("failed to close oci result", K(ret));
|
||||
}
|
||||
#endif
|
||||
if (OB_NOT_NULL(GCTX.dblink_proxy_) &&
|
||||
OB_NOT_NULL(dblink_conn_) &&
|
||||
OB_FAIL(GCTX.dblink_proxy_->release_dblink(link_type_, dblink_conn_))) {
|
||||
LOG_WARN("failed to release connection", K(ret));
|
||||
}
|
||||
sessid_ = 0;
|
||||
dblink_conn_ = NULL;
|
||||
}
|
||||
|
||||
int ObRemoteSequenceExecutor::rescan()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
#ifdef OB_BUILD_DBLINK
|
||||
if (DBLINK_DRV_OCI == link_type_ &&
|
||||
NULL != dblink_conn_ &&
|
||||
OB_FAIL(static_cast<ObOciConnection *>(dblink_conn_)->free_oci_stmt())) {
|
||||
LOG_WARN("failed to close oci result", K(ret));
|
||||
}
|
||||
#endif
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObRemoteSequenceExecutor::get_nextval(ObExecContext &ctx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
sqlclient::ObMySQLResult *result = NULL;
|
||||
ObSQLSessionInfo *my_session = GET_MY_SESSION(ctx);
|
||||
SMART_VAR(ObMySQLProxy::MySQLResult, res) {
|
||||
if (OB_FAIL(rescan())) {
|
||||
LOG_WARN("failed to rescan dblink reset", K(ret));
|
||||
} else if (OB_FAIL(GCTX.dblink_proxy_->dblink_read(dblink_conn_, res, format_sql_))) {
|
||||
LOG_WARN("read failed", K(ret), K(link_type_), K(dblink_conn_), K(format_sql_));
|
||||
} else if (OB_ISNULL(result = res.get_result())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("failed to get result", K(ret));
|
||||
} else if (OB_FAIL(result->next())) {
|
||||
LOG_WARN("failed to get next val", K(ret));
|
||||
} else {
|
||||
uint64_t tenant_id = my_session->get_effective_tenant_id();
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < seq_ids_.count(); i++) {
|
||||
number::ObNumber num_val;
|
||||
ObSequenceValue seq_value;
|
||||
if (OB_FAIL(result->get_number(i, num_val, ctx.get_allocator()))) {
|
||||
LOG_WARN("failed to sequence val", K(ret));
|
||||
} else if (OB_FAIL(seq_value.set(num_val))) {
|
||||
LOG_WARN("failed to set value", K(ret));
|
||||
} else if (OB_FAIL(my_session->set_sequence_value(tenant_id, seq_ids_.at(i), seq_value))) {
|
||||
LOG_WARN("failed to set sequence value", K(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
ObSequenceOp::ObSequenceOp(ObExecContext &exec_ctx, const ObOpSpec &spec, ObOpInput *input)
|
||||
: ObOperator(exec_ctx, spec, input)
|
||||
{
|
||||
}
|
||||
|
||||
ObSequenceOp::~ObSequenceOp()
|
||||
{
|
||||
}
|
||||
|
||||
void ObSequenceOp::destroy()
|
||||
{
|
||||
for (int64_t i = 0; i < seq_executors_.count(); ++i) {
|
||||
seq_executors_.at(i)->destroy();
|
||||
}
|
||||
seq_executors_.reset();
|
||||
ObOperator::destroy();
|
||||
}
|
||||
|
||||
int ObSequenceOp::inner_open()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObSQLSessionInfo *my_session = NULL;
|
||||
if (OB_ISNULL(my_session = GET_MY_SESSION(ctx_))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("fail to get my session", K(ret));
|
||||
} else if (OB_FAIL(init_op())) {
|
||||
LOG_WARN("initialize operator context failed", K(ret));
|
||||
} else if (get_child_cnt() > 1) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("should not have more than 1 child", K(ret));
|
||||
} else if (0 < MY_SPEC.filters_.count()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("sequence operator should have no filter expr", K(ret));
|
||||
}
|
||||
ARRAY_FOREACH_X(MY_SPEC.nextval_seq_ids_, idx, cnt, OB_SUCC(ret)) {
|
||||
const uint64_t seq_id = MY_SPEC.nextval_seq_ids_.at(idx);
|
||||
const ObSequenceSchema *seq_schema = nullptr;
|
||||
uint64_t dblink_id = OB_INVALID_ID;
|
||||
ObSequenceExecutor *executor = NULL;
|
||||
if (OB_FAIL(my_session->get_dblink_sequence_schema(seq_id, seq_schema))) {
|
||||
LOG_WARN("failed to get dblink sequence schema", K(ret));
|
||||
} else if (NULL != seq_schema) {
|
||||
dblink_id = seq_schema->get_dblink_id();
|
||||
}
|
||||
for (int64_t i = 0; NULL==executor && OB_SUCC(ret) && i < seq_executors_.count(); ++i) {
|
||||
if (seq_executors_.at(i)->get_dblink_id() == dblink_id) {
|
||||
executor = seq_executors_.at(i);
|
||||
}
|
||||
}
|
||||
if (NULL != executor) {
|
||||
if (OB_FAIL(executor->add_sequence_id(seq_id))) {
|
||||
LOG_WARN("failed to add sequence id", K(ret));
|
||||
}
|
||||
} else if (OB_INVALID_ID == dblink_id) {
|
||||
//add local executor
|
||||
void *tmp = NULL;
|
||||
if (OB_ISNULL(tmp=ctx_.get_allocator().alloc(sizeof(ObLocalSequenceExecutor)))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("failed to alloc memory", K(ret));
|
||||
} else {
|
||||
executor = new(tmp) ObLocalSequenceExecutor();
|
||||
executor->set_dblink_id(dblink_id);
|
||||
if (OB_FAIL(executor->add_sequence_id(seq_id))) {
|
||||
LOG_WARN("failed to add sequence id", K(ret));
|
||||
} else if (OB_FAIL(seq_executors_.push_back(executor))) {
|
||||
LOG_WARN("failed to push back executor", K(ret));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
//add remote executor
|
||||
void *tmp = NULL;
|
||||
if (OB_ISNULL(tmp=ctx_.get_allocator().alloc(sizeof(ObRemoteSequenceExecutor)))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("failed to alloc memory", K(ret));
|
||||
} else {
|
||||
executor = new(tmp) ObRemoteSequenceExecutor();
|
||||
executor->set_dblink_id(dblink_id);
|
||||
if (OB_FAIL(executor->add_sequence_id(seq_id))) {
|
||||
LOG_WARN("failed to add sequence id", K(ret));
|
||||
} else if (OB_FAIL(seq_executors_.push_back(executor))) {
|
||||
LOG_WARN("failed to push back executor", K(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
ARRAY_FOREACH_X(seq_executors_, idx, cnt, OB_SUCC(ret)) {
|
||||
if (OB_FAIL(seq_executors_.at(idx)->init(ctx_))) {
|
||||
LOG_WARN("failed to init executor", K(ret));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObSequenceOp::inner_close()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
reset();
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObSequenceOp::init_op()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObSequenceOp::inner_get_next_row()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(try_get_next_row())) {
|
||||
LOG_WARN_IGNORE_ITER_END(ret, "fail get next row", K(ret));
|
||||
} else {
|
||||
ARRAY_FOREACH_X(seq_executors_, idx, cnt, OB_SUCC(ret)) {
|
||||
ObSequenceExecutor *executor = seq_executors_.at(idx);
|
||||
if (OB_FAIL(executor->get_nextval(ctx_))) {
|
||||
LOG_WARN("fail get nextval for seq", K(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObSequenceOp::try_get_next_row()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
@ -20,11 +20,12 @@ namespace oceanbase
|
||||
{
|
||||
namespace sql
|
||||
{
|
||||
|
||||
class ObSequenceSpec : public ObOpSpec
|
||||
{
|
||||
OB_UNIS_VERSION_V(1);
|
||||
public:
|
||||
|
||||
|
||||
ObSequenceSpec(common::ObIAllocator &alloc, const ObPhyOperatorType type);
|
||||
|
||||
INHERIT_TO_STRING_KV("op_spec", ObOpSpec, K_(nextval_seq_ids));
|
||||
@ -42,6 +43,60 @@ public:
|
||||
common::ObFixedArray<uint64_t, common::ObIAllocator> nextval_seq_ids_;
|
||||
};
|
||||
|
||||
class ObSequenceExecutor {
|
||||
public:
|
||||
ObSequenceExecutor()
|
||||
:dblink_id_(OB_INVALID_ID) {}
|
||||
~ObSequenceExecutor() { destroy(); }
|
||||
virtual int init(ObExecContext &ctx)=0;
|
||||
virtual void reset() { seq_ids_.reset(); seq_schemas_.reset();}
|
||||
virtual void destroy() { seq_ids_.reset(); seq_schemas_.reset(); }
|
||||
virtual int get_nextval(ObExecContext &ctx)=0;
|
||||
uint64_t get_dblink_id() const { return dblink_id_; }
|
||||
void set_dblink_id(uint64_t id) { dblink_id_ = id; }
|
||||
int add_sequence_id(uint64_t id) { return seq_ids_.push_back(id); }
|
||||
TO_STRING_KV(K_(seq_ids), K_(dblink_id));
|
||||
protected:
|
||||
// schema 放入 context 中是为了利用它的 cache 能力
|
||||
common::ObSEArray<share::schema::ObSequenceSchema, 1> seq_schemas_;
|
||||
common::ObSEArray<uint64_t, 2> seq_ids_;
|
||||
uint64_t dblink_id_;
|
||||
};
|
||||
|
||||
class ObLocalSequenceExecutor : public ObSequenceExecutor {
|
||||
public:
|
||||
ObLocalSequenceExecutor();
|
||||
~ObLocalSequenceExecutor();
|
||||
virtual int init(ObExecContext &ctx) override;
|
||||
virtual void reset() override;
|
||||
virtual void destroy() override;
|
||||
virtual int get_nextval(ObExecContext &ctx) override;
|
||||
private:
|
||||
// sequence 暴露给用户层的是一个 cache
|
||||
// cache 底层负责做 sequence 的缓存更新以及全局的协调
|
||||
share::ObSequenceCache *sequence_cache_;
|
||||
};
|
||||
|
||||
class ObRemoteSequenceExecutor : public ObSequenceExecutor {
|
||||
public:
|
||||
ObRemoteSequenceExecutor();
|
||||
~ObRemoteSequenceExecutor();
|
||||
virtual int init(ObExecContext &ctx) override;
|
||||
virtual void reset() override;
|
||||
virtual void destroy() override;
|
||||
virtual int get_nextval(ObExecContext &ctx) override;
|
||||
private:
|
||||
int init_dblink_connection(ObExecContext &ctx);
|
||||
int init_sequence_sql(ObExecContext &ctx);
|
||||
int rescan();
|
||||
private:
|
||||
uint32_t sessid_;
|
||||
common::sqlclient::DblinkDriverProto link_type_;
|
||||
char* format_sql_;
|
||||
int64_t format_sql_length_;
|
||||
common::sqlclient::ObISQLConnection *dblink_conn_;
|
||||
};
|
||||
|
||||
class ObSequenceOp : public ObOperator
|
||||
{
|
||||
public:
|
||||
@ -54,16 +109,10 @@ public:
|
||||
|
||||
void reset()
|
||||
{
|
||||
sequence_cache_ = NULL;
|
||||
seq_schemas_.reset();
|
||||
seq_executors_.reset();
|
||||
}
|
||||
|
||||
virtual void destroy() override
|
||||
{
|
||||
sequence_cache_ = NULL;
|
||||
seq_schemas_.reset();
|
||||
ObOperator::destroy();
|
||||
}
|
||||
virtual void destroy() override;
|
||||
private:
|
||||
int init_op();
|
||||
/**
|
||||
@ -73,11 +122,7 @@ private:
|
||||
*/
|
||||
int try_get_next_row();
|
||||
private:
|
||||
// sequence 暴露给用户层的是一个 cache
|
||||
// cache 底层负责做 sequence 的缓存更新以及全局的协调
|
||||
share::ObSequenceCache *sequence_cache_;
|
||||
// schema 放入 context 中是为了利用它的 cache 能力
|
||||
common::ObSEArray<share::schema::ObSequenceSchema, 1> seq_schemas_;
|
||||
common::ObSEArray<ObSequenceExecutor*, 1> seq_executors_;
|
||||
};
|
||||
|
||||
} // end namespace sql
|
||||
|
||||
Reference in New Issue
Block a user