diff --git a/deps/oblib/src/lib/json_type/ob_json_tree.cpp b/deps/oblib/src/lib/json_type/ob_json_tree.cpp index 714579372c..6b760f9b16 100644 --- a/deps/oblib/src/lib/json_type/ob_json_tree.cpp +++ b/deps/oblib/src/lib/json_type/ob_json_tree.cpp @@ -579,7 +579,8 @@ ObJsonNode *ObJsonObject::clone(ObIAllocator* allocator, bool is_deep_copy) cons for (uint64_t i = 0; i < len && OB_SUCC(ret); i++) { if (is_deep_copy) { char* str_buf = NULL; - if (OB_ISNULL(str_buf = static_cast(allocator->alloc(object_array_[i].get_key().length())))) { + bool is_key_empty = object_array_[i].get_key().length() == 0; + if (!is_key_empty && OB_ISNULL(str_buf = static_cast(allocator->alloc(object_array_[i].get_key().length())))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("allocate memory failed", K(ret), K(object_array_[i].get_key().length())); } else { diff --git a/src/pl/CMakeLists.txt b/src/pl/CMakeLists.txt index 253ec365ed..ac0220798d 100644 --- a/src/pl/CMakeLists.txt +++ b/src/pl/CMakeLists.txt @@ -34,6 +34,7 @@ ob_set_subtarget(ob_pl common ob_pl_stmt.cpp ob_pl_type.cpp ob_pl_user_type.cpp + ob_pl_json_type.cpp ob_pl_persistent.cpp ) diff --git a/src/pl/ob_pl.cpp b/src/pl/ob_pl.cpp index 5ad932d54c..a2b535752e 100644 --- a/src/pl/ob_pl.cpp +++ b/src/pl/ob_pl.cpp @@ -23,6 +23,7 @@ #include "pl/ob_pl_compile.h" #include "pl/ob_pl_code_generator.h" #include "pl/ob_pl_user_type.h" +#include "pl/ob_pl_json_type.h" #include "pl/ob_pl_stmt.h" #include "pl/ob_pl_interface_pragma.h" #include "observer/ob_server_struct.h" @@ -3027,6 +3028,13 @@ int ObPLExecState::final(int ret) exec_ctx_bak_.restore(*ctx_.exec_ctx_); } + if (OB_NOT_NULL(ctx_.exec_ctx_->get_my_session())) { + #ifdef OB_BUILD_ORACLE_PL + ObSQLSessionInfo *session = ctx_.exec_ctx_->get_my_session(); + ObPlJsonTypeManager::release_useless_resource(session->get_json_pl_mngr()); + #endif + } + return OB_SUCCESS; } diff --git a/src/pl/ob_pl_json_type.cpp b/src/pl/ob_pl_json_type.cpp new file mode 100644 index 0000000000..773b1b264d --- /dev/null +++ b/src/pl/ob_pl_json_type.cpp @@ -0,0 +1,477 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#ifdef OB_BUILD_ORACLE_PL +#define USING_LOG_PREFIX PL +#include "pl/ob_pl_user_type.h" +#include "pl/ob_pl_json_type.h" + +namespace oceanbase +{ +using namespace common; +using namespace share::schema; +using namespace jit; +using namespace obmysql; +using namespace sql; + +namespace pl +{ + +int ObPLJsonBaseType::deep_copy(ObPLOpaque *dst) +{ + int ret = OB_SUCCESS; + ObPlJsonNode* pl_node = data_; + ObJsonNode* ref_node = nullptr; + ObJsonNode* data_node = nullptr; + ObPlJsonTypeManager* pl_manager = nullptr; + if (OB_NOT_NULL(data_)) { + ref_node = pl_node->get_ref_node(); + data_node = pl_node->get_data_node(); + pl_manager = pl_node->get_manager(); + } + + ObPLJsonBaseType *copy = NULL; + OZ (ObPLOpaque::deep_copy(dst)); + CK (OB_NOT_NULL(copy = new(dst)ObPLJsonBaseType())); + OX (copy->set_err_behavior(static_cast(behavior_))); + if (OB_SUCC(ret) && OB_NOT_NULL(data_)) { + ObPlJsonNode* dup = nullptr; + if (OB_FAIL(pl_manager->create_empty_node(dup))) { + LOG_WARN("fail to create empty node", K(ret), K(pl_manager->get_map_count()), + K(pl_manager->get_list_count()), K(pl_manager->get_alloc_count()), + K(pl_manager->get_free_count()), K(pl_manager->get_holding_count())); + } else if (OB_FAIL(dup->assign(pl_node))) { + LOG_WARN("fail to assign node", K(ret)); + } else { + copy->set_data(dup); + if (OB_FAIL(pl_manager->check_candidate_list())) { + LOG_WARN("fail to checkin candidate list node.", K(ret)); + } + } + } + + return ret; +} + +ObPlJsonNode::ObPlJsonNode(ObPlJsonTypeManager *pl_handle, ObJsonNode* json_node) + : ObPlJsonNode(pl_handle) +{ + set_data_node(json_node); + increase_ref(); +} + + +int ObPlJsonNode::assign(ObPlJsonNode* from) +{ + int ret = OB_SUCCESS; + + if (from->get_ref_node()) { + ObJsonNode* origin = from->get_ref_node(); + ObPlJsonNode* from_origin = nullptr; + if (OB_FAIL(pl_manager_->get_node(origin, from_origin))) { + LOG_WARN("fail to get node from manger", K(ret)); + } else { + from_origin->increase_ref(); + set_ref_node(from_origin->get_data_node()); + set_data_node(from->get_data_node()); + } + } else { + from->increase_ref(); + set_ref_node(from->get_data_node()); + set_data_node(from->get_data_node()); + } + + return ret; +} + +int ObPlJsonNode::clone(ObPlJsonNode* other, bool is_deep_copy) +{ + return clone(other->get_data_node(), is_deep_copy); +} + +int ObPlJsonNode::clone(ObJsonNode* other, bool is_deep_copy) +{ + int ret = OB_SUCCESS; + ObJsonNode *json_dst = other->clone(&allocator_, true); + if (OB_ISNULL(json_dst)) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("alloc memory for clone json node failed", K(ret)); + } else { + set_data_node(json_dst); + increase_ref(); + } + + return ret; +} + +int ObPlJsonNode::parse_tree(const ObString& text, ObJsonInType in_type) +{ + int ret = OB_SUCCESS; + ObJsonNode* tree = nullptr; + if (OB_FAIL(ObJsonBaseFactory::get_json_tree(&allocator_, text, in_type, tree, ObJsonParser::JSN_RELAXED_FLAG))) { + LOG_WARN("fail to get json base", K(ret), K(in_type)); + } else { + set_data_node(tree); + increase_ref(); + } + + return ret; +} + +int ObPlJsonNode::unref() +{ + int ret = OB_SUCCESS; + if (OB_NOT_NULL(get_ref_node())) { + ObPlJsonNode* dom_node = nullptr; + if (OB_FAIL(pl_manager_->get_node(get_ref_node(), dom_node))) { + LOG_WARN("fail to get node from manger", K(ret)); + } else if (OB_ISNULL(dom_node)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("fail to unassign get dom node is null", K(ret)); + } else if (dom_node->decrease_ref() == 0) { + if (OB_FAIL(pl_manager_->remove_node(dom_node, false))) { + LOG_WARN("fail to remove node from manger", K(ret)); + } + } + } else if (decrease_ref() == 0) { + if (OB_FAIL(pl_manager_->remove_node(this, false))) { + LOG_WARN("fail to remove node from manger", K(ret)); + } + } + + return ret; +} + +void ObPlJsonNode::reuse() +{ + data_ = nullptr; + origin_ = nullptr; + ref_count_ = 0; + ref_type_ = 0; + allocator_.reset(); +} + +void ObPlJsonNode::free() +{ + reuse(); +} + +int ObPlJsonTypeManager::destroy_node(ObPlJsonNode* node) +{ + int ret = OB_SUCCESS; + if (OB_FAIL(node->unref())) { + LOG_WARN("failed to unref current node", K(ret)); + } else if (OB_FAIL(check_candidate_list())) { + LOG_WARN("failed to eliminate node", K(ret)); + } else if (node->get_ref_node()) { + free_empty_node(node); + } + return ret; +} + +ObPlJsonTypeManager::ObPlJsonTypeManager(uint64_t tenant_id) + : dom_node_allocator_(sizeof(ObPlJsonNode), + common::OB_MALLOC_NORMAL_BLOCK_SIZE - 32, + ObMalloc(lib::ObMemAttr(tenant_id, "JsonPlDom"))), + candidates_(dom_node_allocator_), + json_dom_map_(), + tenant_id_(tenant_id), + is_init_(false) +{ +} + +void ObPlJsonTypeManager::free_empty_node(ObPlJsonNode* node) +{ + dom_node_allocator_.free(node); + ++free_count_; +} + + +int ObPlJsonTypeManager::create_empty_node(ObPlJsonNode*& res) +{ + int ret = OB_SUCCESS; + ObPlJsonNode* tmp = nullptr; + + if (OB_ISNULL(tmp = static_cast(dom_node_allocator_.alloc(PL_JSON_DOM_LEN)))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("failed to alloc empty dom node", K(ret), K(*this)); + } else { + res = new (tmp) ObPlJsonNode(this); + ++alloc_count_; + } + + LOG_DEBUG("json pl manager statistic:", K(get_map_count()), K(get_list_count()), + K(get_alloc_count()), K(get_free_count()), K(get_holding_count())); + + return ret; +} + +int ObPlJsonTypeManager::create_new_node(ObJsonNode* data, ObPlJsonNode*& res) +{ + int ret = OB_SUCCESS; + ObPlJsonNode* tmp = nullptr; + ObJsonNode* clone = nullptr; + + if (OB_FAIL(init())) { + LOG_WARN("failed to init", K(ret)); + } else if (OB_FAIL(create_empty_node(tmp))) { + LOG_WARN("failed to create empty node", K(ret), K(get_map_count()), K(get_list_count()), + K(get_alloc_count()), K(get_free_count()), K(get_holding_count())); + } else if (OB_ISNULL(clone = data->clone(&tmp->get_allocator(), true))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("failed to clone node", K(ret)); + } else { + tmp->set_data_node(clone); + tmp->increase_ref(); + + if (OB_FAIL(add_node(tmp))) { + LOG_WARN("failed to add tree", K(ret), K(get_map_count()), K(get_list_count()), + K(get_alloc_count()), K(get_free_count()), K(get_holding_count())); + } else { + res = tmp; + } + } + return ret; +} + +int ObPlJsonTypeManager::create_ref_node(ObJsonNode* data, ObJsonNode* ref, ObPlJsonNode*& res) +{ + int ret = OB_SUCCESS; + ObPlJsonNode* tmp = nullptr; + ObPlJsonNode* origin = nullptr; + + if (OB_FAIL(init())) { + LOG_WARN("failed to init", K(ret)); + } else if (OB_FAIL(get_node(ref, origin))) { + LOG_WARN("failed to get node", K(ret)); + } else if (OB_FAIL(create_empty_node(tmp))) { + LOG_WARN("failed to create empty node", K(ret), K(get_map_count()), K(get_list_count()), + K(get_alloc_count()), K(get_free_count()), K(get_holding_count())); + } else { + tmp->set_data_node(data); + tmp->set_ref_node(origin->get_data_node()); + origin->increase_ref(); + + res = tmp; + } + return ret; +} + +int ObPlJsonTypeManager::create_new_node(const ObString& text, ObJsonInType in_type, ObPlJsonNode*& res) +{ + int ret = OB_SUCCESS; + + ObPlJsonNode* tmp = nullptr; + if (OB_FAIL(init())) { + LOG_WARN("failed to init", K(ret)); + } else if (OB_FAIL(create_empty_node(tmp))) { + LOG_WARN("failed to create empty node", K(ret), K(get_map_count()), K(get_list_count()), + K(get_alloc_count()), K(get_free_count()), K(get_holding_count())); + } else if (OB_FAIL(tmp->parse_tree(text, in_type))) { + LOG_WARN("failed to parse tree", K(ret)); + } else if (OB_FAIL(add_node(tmp))) { + LOG_WARN("failed to add tree", K(ret), K(get_map_count()), K(get_list_count()), + K(get_alloc_count()), K(get_free_count()), K(get_holding_count())); + } else { + res = tmp; + } + + return ret; +} + +int ObPlJsonTypeManager::init() +{ + int ret = OB_SUCCESS; + if (!is_init_) { + ObMemAttr bucket_attr(tenant_id_, "jsonPlBucket"); + ObMemAttr node_attr(tenant_id_, "jsonPlBuckNode"); + if (OB_FAIL(json_dom_map_.create(JSON_PL_BUCKET_NUM, bucket_attr, node_attr))) { + LOG_WARN("failed to create json bucket num", K(ret)); + } else { + is_init_ = true; + } + } + + return ret; +} + +int ObPlJsonTypeManager::add_node(ObPlJsonNode* node) +{ + int ret = OB_SUCCESS; + ObPlJsonNode* value = nullptr; + + if (OB_FAIL(init())) { + LOG_WARN("failed to init", K(ret)); + } else if (OB_NOT_NULL(node->data_) + && OB_FAIL(json_dom_map_.get_refactored(reinterpret_cast(node->data_), value))) { + if (ret == OB_HASH_NOT_EXIST) { + if (OB_FAIL(json_dom_map_.set_refactored(reinterpret_cast(node->data_), node))) { + LOG_WARN("failed to set json pl object into bucket.", K(ret)); + } + } + } + + return ret; +} + +int ObPlJsonTypeManager::remove_node(ObPlJsonNode* node, bool do_force) +{ + int ret = OB_SUCCESS; + ObPlJsonNode* dom_value = nullptr; + + if (OB_FAIL(init())) { + LOG_WARN("failed to init", K(ret)); + } else if (OB_FAIL(json_dom_map_.get_refactored(reinterpret_cast(node->data_), dom_value))) { + if (ret != OB_HASH_NOT_EXIST) { + LOG_WARN("failed to set json pl object into bucket.", K(ret)); + } else { + ret = OB_SUCCESS; + } + } else if (!do_force && OB_FAIL(candidates_.push_front(node))) { + LOG_WARN("failed to add into candidates list.", K(ret)); + } else if (OB_FAIL(json_dom_map_.erase_refactored(reinterpret_cast(node->data_)))) { + LOG_WARN("failed to remove from candidates list.", K(ret)); + } + + return ret; +} + +int ObPlJsonTypeManager::get_node(ObJsonNode* node, ObPlJsonNode*& value) +{ + int ret = OB_SUCCESS; + value = nullptr; + ObPlJsonNode* dom_value = nullptr; + + if (OB_FAIL(init())) { + LOG_WARN("failed to init", K(ret)); + } else if (OB_FAIL(json_dom_map_.get_refactored(reinterpret_cast(node), dom_value))) { + if (ret != OB_HASH_NOT_EXIST) { + LOG_WARN("failed to set json pl object into bucket.", K(ret)); + } else { + ObList::iterator it = candidates_.begin(); + for (; it != candidates_.end(); ++it) { + ObPlJsonNode* temp = *it; + if (temp->data_ == node) { + value = temp; + break; + } + } + + if (OB_NOT_NULL(value)) { + ret = OB_SUCCESS; + } + } + } else if (OB_ISNULL(dom_value)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("failed to get json node from hash.", K(ret)); + } else { + value = dom_value; + } + + return ret; +} + +int ObPlJsonTypeManager::check_candidate_list() +{ + int ret = OB_SUCCESS; + + if (OB_FAIL(init())) { + LOG_WARN("failed to init", K(ret)); + } else { + ObList::iterator it = candidates_.begin(); + for (; it != candidates_.end(); ) { + ObPlJsonNode* temp = *it; + if (temp->ref_count()) { + if (OB_FAIL(json_dom_map_.set_refactored(reinterpret_cast(temp->data_), temp))) { + LOG_WARN("failed to init", K(ret)); + } else { + ++it; + } + } else if (!temp->ref_count()) { + ++it; + free(temp); + } + } + } + + return ret; +} + +void ObPlJsonTypeManager::free(ObPlJsonNode* node) +{ + node->free(); + candidates_.erase(node); + dom_node_allocator_.free(node); +} + +void ObPlJsonTypeManager::destroy() +{ + ObJsonDomMap::iterator iter = json_dom_map_.begin(); + for (; iter != json_dom_map_.end(); ) { + ObPlJsonNode* node = iter->second; + if (OB_NOT_NULL(node)) { + iter++; + node->free(); + } else { + ++iter; + } + } + + candidates_.clear(); + json_dom_map_.destroy(); + dom_node_allocator_.reset(); + is_init_ = false; +} +uint64_t ObPlJsonTypeManager::get_map_count() +{ + return json_dom_map_.size(); +} + +uint64_t ObPlJsonTypeManager::get_list_count() +{ + return candidates_.size(); +} + +uint64_t ObPlJsonTypeManager::get_alloc_count() +{ + return alloc_count_; +} + +uint64_t ObPlJsonTypeManager::get_free_count() +{ + return free_count_; +} + +uint64_t ObPlJsonTypeManager::get_holding_count() +{ + return alloc_count_ - free_count_; +} + +void ObPlJsonTypeManager::release(intptr_t handle) +{ + ObPlJsonTypeManager* manager = reinterpret_cast(handle); + if (manager) { + manager->destroy(); + } +} + +void ObPlJsonTypeManager::release_useless_resource(intptr_t handle) +{ + ObPlJsonTypeManager* manager = reinterpret_cast(handle); + if (manager) { + manager->check_candidate_list(); + } +} + +} // namespace pl +} // namespace oceanbase +#endif \ No newline at end of file diff --git a/src/pl/ob_pl_json_type.h b/src/pl/ob_pl_json_type.h new file mode 100644 index 0000000000..80b5433bbb --- /dev/null +++ b/src/pl/ob_pl_json_type.h @@ -0,0 +1,173 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#ifdef OB_BUILD_ORACLE_PL +#ifndef DEV_SRC_PL_OB_PL_JSON_TYPE_H_ +#define DEV_SRC_PL_OB_PL_JSON_TYPE_H_ +#include "pl/ob_pl_type.h" +#include "lib/hash/ob_hashmap.h" +#include "lib/json_type/ob_json_tree.h" +#include "lib/json_type/ob_json_parse.h" +#include "pl/ob_pl_user_type.h" + + +namespace oceanbase +{ +namespace pl +{ + +struct ObPlJsonNode; + +class ObPlJsonTypeManager { +public: + const uint32_t JSON_PL_BUCKET_NUM = 1000; + typedef common::hash::ObHashMap ObJsonDomMap; + + ObPlJsonTypeManager(uint64_t tenant_id); + int create_new_node(const ObString& text, ObJsonInType in_type, ObPlJsonNode*& res); + int create_ref_node(ObJsonNode* data, ObJsonNode* ref, ObPlJsonNode*& res); + int create_new_node(ObJsonNode* data, ObPlJsonNode*& res); + int create_empty_node(ObPlJsonNode*& res); + void free_empty_node(ObPlJsonNode* node); + int destroy_node(ObPlJsonNode* node); + int add_node(ObPlJsonNode*); + int remove_node(ObPlJsonNode*, bool force = true); + int get_node(ObJsonNode*, ObPlJsonNode*&); + int init(); + int check_candidate_list(); + void destroy(); + void free(ObPlJsonNode* node); + common::ObIAllocator* get_dom_node_allocator() { return &dom_node_allocator_; } + + static void release(intptr_t handle); + static void release_useless_resource(intptr_t handle); + + uint64_t get_map_count(); + uint64_t get_list_count(); + uint64_t get_alloc_count(); + uint64_t get_free_count(); + uint64_t get_holding_count(); + + common::ObSmallBlockAllocator<> dom_node_allocator_; + ObList candidates_; + ObJsonDomMap json_dom_map_; + uint64_t tenant_id_; + bool is_init_; + + uint64_t alloc_count_; + uint64_t free_count_; + + TO_STRING_KV("alloc total size", dom_node_allocator_.get_total_mem_size(), + "node map count", json_dom_map_.size(), + "list count", candidates_.size(), + K_(tenant_id), + K_(alloc_count), + K_(free_count)); +}; + +struct ObPlJsonNode { + ObPlJsonNode(ObPlJsonTypeManager *pl_handle) + : data_(nullptr), + origin_(nullptr), + ref_count_(0), + ref_type_(0), + allocator_(ObMemAttr(pl_handle->tenant_id_, "JsonPlManager"), OB_MALLOC_NORMAL_BLOCK_SIZE), + pl_manager_(pl_handle) {} + + ObPlJsonNode(ObPlJsonTypeManager *pl_handle, + ObJsonNode* json_node); + + int parse_tree(const ObString& text, ObJsonInType in_type); + common::ObIAllocator& get_allocator() { return allocator_; } + int clone(ObPlJsonNode* other, bool is_deep_copy = true); + int clone(ObJsonNode* other, bool is_deep_copy = true); + + int32_t increase_ref() { return ++ref_count_; } + int32_t decrease_ref() { return --ref_count_; } + void set_data_node(ObJsonNode* node) { data_ = node; } + void set_ref_node(ObJsonNode* node) { origin_ = node; } + ObJsonNode* get_ref_node() { return origin_; } + ObJsonNode* get_data_node() { return data_; } + int32_t ref_count() { return ref_count_; } + + + ObPlJsonTypeManager* get_manager() { return pl_manager_; } + + int unref(); + int assign(ObPlJsonNode* from); + + void reuse(); + void free(); + + ObJsonNode* data_; // for save current using obj + ObJsonNode* origin_; // for save reference original obj + int32_t ref_count_; + int32_t ref_type_; + common::ObArenaAllocator allocator_; + ObPlJsonTypeManager *pl_manager_; + + TO_STRING_KV(KPC(data_), KPC(origin_), K_(ref_count), KPC(pl_manager_)); +}; + +static uint32_t PL_JSON_DOM_LEN = sizeof(ObPlJsonNode); + +class ObPLJsonBaseType : public ObPLOpaque +{ +public: + enum JSN_ERR_BEHAVIOR { + JSN_PL_NULL_ON_ERR, + JSN_PL_ERR_ON_ERR, + JSN_PL_ERR_ON_EMP, + JSN_PL_ERR_ON_MISMATCH, + JSN_PL_ERR_ON_INVALID = 7 + }; + + ObPLJsonBaseType() + : ObPLOpaque(ObPLOpaqueType::PL_JSON_TYPE), + data_(NULL), + behavior_(0) + {} + + void destroy() + { + if (OB_NOT_NULL(data_)) { + ObPlJsonTypeManager* manager = data_->get_manager(); + manager->destroy_node(data_); + } + + data_ = NULL; + behavior_ = 0; + } + + virtual ~ObPLJsonBaseType() + { + destroy(); + } + +public: + virtual int deep_copy(ObPLOpaque *dst); + void set_data(ObPlJsonNode *data) { data_ = data; } + void set_err_behavior(int behavior) { behavior_ = behavior; } + int get_err_behavior() { return behavior_ ; } + ObPlJsonNode* get_data() { return data_; } + + TO_STRING_KV(KPC(data_), K_(behavior)); + +private: + ObPlJsonNode *data_; + int behavior_; +}; + +} // namespace pl +} // namespace oceanbase +#endif /* DEV_SRC_PL_OB_PL_JSON_TYPE_H_ */ +#endif \ No newline at end of file diff --git a/src/pl/ob_pl_user_type.cpp b/src/pl/ob_pl_user_type.cpp index 3660126362..8953f6642e 100644 --- a/src/pl/ob_pl_user_type.cpp +++ b/src/pl/ob_pl_user_type.cpp @@ -26,6 +26,7 @@ #include "pl/ob_pl_allocator.h" #include "share/ob_lob_access_utils.h" #include "observer/mysql/ob_query_driver.h" +#include "lib/json_type/ob_json_parse.h" namespace oceanbase { @@ -382,6 +383,14 @@ int ObUserDefinedType::destruct_obj(ObObj &src, ObSQLSessionInfo *session) case PL_OPAQUE_TYPE: { ObPLOpaque *opaque = reinterpret_cast(src.get_ext()); CK (OB_NOT_NULL(opaque)); + // json pl object manage + if (OB_NOT_NULL(opaque) && opaque->is_json_type()) { + ObPLJsonBaseType* pl_jsontype = static_cast(opaque); + ObPlJsonNode* pl_json_node = pl_jsontype->get_data(); + if (OB_NOT_NULL(pl_json_node) && OB_NOT_NULL(pl_json_node->get_data_node())) { + pl_jsontype->destroy(); + } + } OX (opaque->~ObPLOpaque()); } break; @@ -5606,32 +5615,6 @@ int ObPLXmlType::deep_copy(ObPLOpaque *dst) return ret; } -int ObPLJsonBaseType::deep_copy(ObPLOpaque *dst) -{ - int ret = OB_SUCCESS; - - ObPLJsonBaseType *copy = NULL; - OZ (ObPLOpaque::deep_copy(dst)); - CK (OB_NOT_NULL(copy = new(dst)ObPLJsonBaseType())); - OX (copy->set_err_behavior(static_cast(behavior_))); - if (OB_SUCC(ret) && OB_NOT_NULL(data_)) { - if (need_shallow_copy()) { - copy->set_data(data_); - copy->set_shallow_copy(1); - } else { - ObJsonNode *json_dst = data_->clone(©->get_allocator(), true); - if (OB_ISNULL(json_dst)) { - ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_WARN("alloc memory for clone json node failed", K(ret)); - } else { - copy->set_data(json_dst); - } - } - } - - return ret; -} - //---------- for ObPLVarray ---------- int ObPLVArray::deep_copy(ObPLCollection *src, ObIAllocator *allocator, bool ignore_del_element) diff --git a/src/pl/ob_pl_user_type.h b/src/pl/ob_pl_user_type.h index cfd0b75754..c751af8438 100644 --- a/src/pl/ob_pl_user_type.h +++ b/src/pl/ob_pl_user_type.h @@ -14,6 +14,7 @@ #define DEV_SRC_PL_OB_PL_USER_TYPE_H_ #include "pl/ob_pl_type.h" #include "rpc/obmysql/ob_mysql_util.h" +#include "lib/hash/ob_hashmap.h" #include "lib/hash/ob_array_index_hash_set.h" #include "lib/container/ob_array_wrap.h" #include "lib/json_type/ob_json_tree.h" @@ -1438,47 +1439,6 @@ private: ObObj *data_; }; -class ObPLJsonBaseType : public ObPLOpaque -{ -public: - enum JSN_ERR_BEHAVIOR { - JSN_PL_NULL_ON_ERR, - JSN_PL_ERR_ON_ERR, - JSN_PL_ERR_ON_EMP, - JSN_PL_ERR_ON_MISMATCH, - JSN_PL_ERR_ON_INVALID = 7 - }; - - ObPLJsonBaseType() - : ObPLOpaque(ObPLOpaqueType::PL_JSON_TYPE), - data_(NULL), - behavior_(0), - is_shallow_copy_(0) - {} - - virtual ~ObPLJsonBaseType() - { - data_ = NULL; - behavior_ = 0; - } - -public: - virtual int deep_copy(ObPLOpaque *dst); - void set_data(ObJsonNode *data) { data_ = data; } - void set_err_behavior(int behavior) { behavior_ = behavior; } - int get_err_behavior() { return behavior_ ; } - ObJsonNode* get_data() { return data_; } - bool need_shallow_copy() { return is_shallow_copy_ > 0; } - void set_shallow_copy(int value) { is_shallow_copy_ = value; } - - TO_STRING_KV(KPC(data_), K_(behavior), K_(is_shallow_copy)); - -private: - ObJsonNode *data_; - int behavior_; - int is_shallow_copy_; -}; - #endif } // namespace pl diff --git a/src/pl/sys_package/ob_json_array_type.h b/src/pl/sys_package/ob_json_array_type.h index b3b09cab19..88786fda49 100644 --- a/src/pl/sys_package/ob_json_array_type.h +++ b/src/pl/sys_package/ob_json_array_type.h @@ -34,6 +34,7 @@ private: static int get_array_value(sql::ObExecContext &ctx, sql::ParamStore ¶ms, ObJsonNode *&json_val, + ObPlJsonNode *&pl_json_node, int& error_behavior, int expect_param_nums = 2); }; diff --git a/src/pl/sys_package/ob_json_object_type.h b/src/pl/sys_package/ob_json_object_type.h index 7a90a29921..8359661117 100644 --- a/src/pl/sys_package/ob_json_object_type.h +++ b/src/pl/sys_package/ob_json_object_type.h @@ -65,6 +65,7 @@ private: static int get_object_value(sql::ObExecContext &ctx, sql::ParamStore ¶ms, ObJsonNode *&json_val, + ObPlJsonNode *&pl_json_node, int& error_behavior, int expect_param_nums = 2); static int get_lob_proc(sql::ObExecContext &ctx, sql::ParamStore ¶ms, common::ObObj &result, bool is_clob); diff --git a/src/pl/sys_package/ob_json_pl_utils.h b/src/pl/sys_package/ob_json_pl_utils.h index db22ffc43a..025ff86194 100644 --- a/src/pl/sys_package/ob_json_pl_utils.h +++ b/src/pl/sys_package/ob_json_pl_utils.h @@ -17,7 +17,7 @@ #include "lib/json_type/ob_json_tree.h" #include "sql/engine/ob_exec_context.h" #include "sql/session/ob_sql_session_info.h" -#include "pl/ob_pl_user_type.h" +#include "pl/ob_pl_json_type.h" namespace oceanbase { @@ -41,7 +41,11 @@ public: static int parse(sql::ObExecContext &ctx, sql::ParamStore ¶ms, common::ObObj &result, ObJsonNodeType expect_type = ObJsonNodeType::J_ERROR); - static int get_jsontree(sql::ObExecContext &ctx, ObObj &obj, ObJsonNode *&json_doc, int32_t &err_behavior); + static int get_jsontree(sql::ObExecContext &ctx, + ObObj &obj, + ObJsonNode*& json_doc, + ObPlJsonNode*& pl_json_node, + int32_t &err_behavior); static int get_jsontype(sql::ObExecContext &ctx, sql::ParamStore ¶ms, ObJsonNodeType &json_type, @@ -51,9 +55,18 @@ public: static int make_jsontype(sql::ObExecContext &ctx, const ObString &str, ObJsonInType in_type, ObJsonNodeType expect_type, ObPLJsonBaseType *&jsontype); + static int make_jsontype(sql::ObExecContext &ctx, + ObJsonNode* data, + int behavior, + ObPLJsonBaseType *&jsontype); static int transform_JsonBase_2_PLJsonType(sql::ObExecContext &ctx, ObJsonNode* json_val, ObPLJsonBaseType *&jsontype); + static int transform_JsonBase_2_PLJsonType(sql::ObExecContext &ctx, + ObJsonNode* json_ref_val, + ObJsonNode* json_val, + ObPLJsonBaseType *&jsontype); + static int print_decimal(number::ObNumber &num, ObScale scale, ObJsonBuffer &j_buf); static int get_json_object(sql::ObExecContext &ctx, ObJsonNode*& json_val); static int get_json_array(sql::ObExecContext &ctx, ObJsonNode*& json_val); diff --git a/src/sql/engine/expr/ob_expr_treat.cpp b/src/sql/engine/expr/ob_expr_treat.cpp index cca1bba428..061fa0eb59 100644 --- a/src/sql/engine/expr/ob_expr_treat.cpp +++ b/src/sql/engine/expr/ob_expr_treat.cpp @@ -98,22 +98,20 @@ int ObExprTreat::cg_expr(ObExprCGCtx &expr_cg_ctx, static int treat_as_json_udt(const ObExpr &expr, ObEvalCtx &ctx, common::ObIAllocator &temp_allocator, pl::ObPLOpaque *opaque, ObDatum &res) { INIT_SUCC(ret); - ObJsonNode *json_doc = nullptr; pl::ObPLJsonBaseType *jsontype = nullptr; + pl::ObPlJsonNode *pl_json_node = nullptr; pl::ObPLJsonBaseType *new_jsontype = nullptr; ObObj res_obj; if(OB_ISNULL(jsontype = static_cast(opaque))) { ret = OB_ERR_UNEXPECTED; LOG_WARN("cast to json type is null", K(ret), K(opaque)); - } else if(OB_ISNULL(json_doc = jsontype->get_data())) { + } else if(OB_ISNULL(pl_json_node = jsontype->get_data())) { res.set_null(); } else { - ObJsonNode * json_node_copy = nullptr; - if (OB_ISNULL(json_node_copy = json_doc->clone(&ctx.exec_ctx_.get_allocator()))) { - ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_WARN("failed to clone json node", K(ret)); - } else if (OB_FAIL(pl::ObPlJsonUtil::transform_JsonBase_2_PLJsonType(ctx.exec_ctx_, json_node_copy, new_jsontype))) { + if (OB_FAIL(pl::ObPlJsonUtil::transform_JsonBase_2_PLJsonType(ctx.exec_ctx_, + pl_json_node->get_ref_node() ? pl_json_node->get_ref_node() : pl_json_node->get_data_node(), + new_jsontype))) { LOG_WARN("failed to transfrom ObJsonNode to ObPLJsonBaseType", K(ret)); } else if(OB_ISNULL(new_jsontype)) { ret = OB_ERR_UNEXPECTED; diff --git a/src/sql/session/ob_basic_session_info.cpp b/src/sql/session/ob_basic_session_info.cpp index f54663b733..23fd881838 100644 --- a/src/sql/session/ob_basic_session_info.cpp +++ b/src/sql/session/ob_basic_session_info.cpp @@ -36,6 +36,7 @@ #include "share/rc/ob_tenant_base.h" #include "pl/sys_package/ob_dbms_sql.h" #include "pl/ob_pl_package_state.h" +#include "pl/ob_pl_json_type.h" #include "rpc/obmysql/ob_sql_sock_session.h" #include "sql/engine/expr/ob_expr_regexp_context.h" @@ -94,6 +95,7 @@ ObBasicSessionInfo::ObBasicSessionInfo(const uint64_t tenant_id) package_info_allocator_(sizeof(pl::ObPLPackageState), common::OB_MALLOC_NORMAL_BLOCK_SIZE - 32, ObMalloc(lib::ObMemAttr(orig_tenant_id_, "SessPackageInfo"))), name_pool_(lib::ObMemAttr(orig_tenant_id_, ObModIds::OB_SQL_SESSION), OB_MALLOC_NORMAL_BLOCK_SIZE), + json_pl_mngr_(0), trans_flags_(), sql_scope_flags_(), need_reset_package_(false), @@ -286,6 +288,9 @@ void ObBasicSessionInfo::destroy() } total_stmt_tables_.reset(); cur_stmt_tables_.reset(); +#ifdef OB_BUILD_ORACLE_PL + pl::ObPlJsonTypeManager::release(get_json_pl_mngr()); +#endif } void ObBasicSessionInfo::clean_status() @@ -6657,5 +6662,48 @@ observer::ObSMConnection *ObBasicSessionInfo::get_sm_connection() } return conn; } + +void ObBasicSessionInfo::destory_json_pl_mngr() +{ +#ifdef OB_BUILD_ORACLE_PL + if (json_pl_mngr_) { + pl::ObPlJsonTypeManager* handle = reinterpret_cast(json_pl_mngr_); + handle->destroy(); + common::ObIAllocator& allocator = get_session_allocator(); + allocator.free(handle); + json_pl_mngr_ = 0; + } +#endif +} + +intptr_t ObBasicSessionInfo::get_json_pl_mngr() +{ + int ret = OB_SUCCESS; +#ifdef OB_BUILD_ORACLE_PL + if (!json_pl_mngr_) { + common::ObIAllocator& allocator = get_session_allocator(); + pl::ObPlJsonTypeManager* handle = static_cast( + allocator.alloc(sizeof(pl::ObPlJsonTypeManager))); + if (OB_ISNULL(handle)) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("failed to allocate handle", K(ret)); + } else { + handle = new (handle) pl::ObPlJsonTypeManager(orig_tenant_id_); + if (OB_FAIL(handle->init())) { + allocator.free(handle); + LOG_WARN("failed to init json pl type manager", K(ret)); + } else { + json_pl_mngr_ = reinterpret_cast(handle); + } + } + } +#else + ret = OB_NOT_SUPPORTED; + LOG_WARN("failed to create json pl type manager", K(ret)); +#endif + + return json_pl_mngr_; +} + }//end of namespace sql }//end of namespace oceanbase diff --git a/src/sql/session/ob_basic_session_info.h b/src/sql/session/ob_basic_session_info.h index 7e44a2ab07..ec74707e60 100644 --- a/src/sql/session/ob_basic_session_info.h +++ b/src/sql/session/ob_basic_session_info.h @@ -592,8 +592,8 @@ public: const common::ObLogIdLevelMap *get_log_id_level_map() const; const common::ObString &get_client_version() const { return client_version_; } const common::ObString &get_driver_version() const { return driver_version_; } - - + void destory_json_pl_mngr(); + intptr_t get_json_pl_mngr(); int get_tx_timeout(int64_t &tx_timeout) const { tx_timeout = sys_vars_cache_.get_ob_trx_timeout(); @@ -2181,6 +2181,7 @@ protected: common::ObSmallBlockAllocator<> cursor_info_allocator_; // for alloc memory of PS CURSOR/SERVER REF CURSOR common::ObSmallBlockAllocator<> package_info_allocator_; // for alloc memory of session package state common::ObStringBuf name_pool_; // for variables names and statement names + intptr_t json_pl_mngr_; // for pl json manage TransFlags trans_flags_; SqlScopeFlags sql_scope_flags_; bool need_reset_package_; // for dbms_session.reset_package