diff --git a/src/objit/include/objit/common/ob_item_type.h b/src/objit/include/objit/common/ob_item_type.h index 05459b871f..4ce7ca1d38 100644 --- a/src/objit/include/objit/common/ob_item_type.h +++ b/src/objit/include/objit/common/ob_item_type.h @@ -2702,6 +2702,7 @@ typedef enum ObItemType T_PX_NODE_POLICY = 4774, T_PX_NODE_ADDRS = 4775, T_PX_NODE_COUNT = 4776, + T_LOAD_DATA_URL = 4777, T_MAX //Attention: add a new type before T_MAX } ObItemType; diff --git a/src/share/CMakeLists.txt b/src/share/CMakeLists.txt index a2829cdba8..22dafad467 100644 --- a/src/share/CMakeLists.txt +++ b/src/share/CMakeLists.txt @@ -563,6 +563,7 @@ ob_set_subtarget(ob_share external_table external_table/ob_external_table_file_rpc_processor.cpp external_table/ob_external_table_file_task.cpp external_table/ob_external_table_utils.cpp + external_table/ob_partition_id_row_pair.cpp ) ob_set_subtarget(ob_share io diff --git a/src/share/external_table/ob_partition_id_row_pair.cpp b/src/share/external_table/ob_partition_id_row_pair.cpp new file mode 100644 index 0000000000..a8751a9968 --- /dev/null +++ b/src/share/external_table/ob_partition_id_row_pair.cpp @@ -0,0 +1,129 @@ + /** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ +#define USING_LOG_PREFIX SQL +#include "share/external_table/ob_partition_id_row_pair.h" + +namespace oceanbase +{ + +namespace share +{ + +int ObPartitionIdRowPairArray::set_part_pair_by_idx(const int64_t idx, ObPartitionIdRowPair &pair) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(idx < 0 || idx >= pairs_.count())) { + ret = OB_INDEX_OUT_OF_RANGE; + LOG_WARN("array index out of range", K(ret), K(idx), K(pairs_.count())); + } else { + pairs_.at(idx) = pair; + } + return ret; +} +int ObPartitionIdRowPairArray::reserve(const int64_t capacity) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(capacity < 0)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid capacity", K(ret), K(capacity)); + } else if (OB_FAIL(pairs_.allocate_array(allocator_, capacity))) { + LOG_WARN("fail to reserve array", K(ret), K(capacity)); + } + return ret; +} + +int ObPartitionIdRowPairArray::serialize(char *buf, const int64_t buf_len, int64_t &pos) const +{ + int ret = OB_SUCCESS; + // 序列化数组大小 + if (OB_FAIL(serialization::encode_vi64(buf, buf_len, pos, pairs_.count()))) { + LOG_WARN("fail to encode count", K(ret)); + } + // 序列化每个pair + for (int64_t i = 0; OB_SUCC(ret) && i < pairs_.count(); i++) { + const ObPartitionIdRowPair &pair = pairs_.at(i); + LST_DO_CODE(OB_UNIS_ENCODE, pair.part_id_); + if (OB_SUCC(ret)) { + if (OB_FAIL(pair.list_row_value_.serialize(buf, buf_len, pos))) { + LOG_WARN("fail to serialize row", K(ret)); + } + } + } + return ret; +} + +int ObPartitionIdRowPairArray::deserialize(const char *buf, const int64_t data_len, int64_t &pos) +{ + int ret = OB_SUCCESS; + // 反序列化数组大小 + int64_t count = 0; + if (OB_FAIL(serialization::decode_vi64(buf, data_len, pos, &count))) { + LOG_WARN("fail to decode count", K(ret)); + } else if (count < 0) { + LOG_WARN("invalid count", K(ret), K(count)); + ret = OB_INVALID_ARGUMENT; + } else if (count == 0) { + // 如果count为0,则不进行反序列化 + return ret; + } else if (OB_FAIL(pairs_.allocate_array(allocator_, count))) { + LOG_WARN("fail to alloc pairs array", K(ret), K(count)); + } else { + // 分配临时内存用于ObObj数组 + ObArenaAllocator tmp_allocator("PartIdRowPair"); + void *tmp_buf = NULL; + ObObj *obj_array = NULL; + const int obj_capacity = 1024; + + if (OB_ISNULL(tmp_buf = tmp_allocator.alloc(sizeof(ObObj) * obj_capacity))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("fail to alloc buf", KR(ret)); + } else if (OB_ISNULL(obj_array = new (tmp_buf) ObObj[obj_capacity])) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("fail to new obj array", KR(ret)); + } else { + // 反序列化每个pair + for (int64_t i = 0; OB_SUCC(ret) && i < count; i++) { + ObPartitionIdRowPair pair; + ObNewRow row; + ObNewRow tmp_row; + row.assign(obj_array, obj_capacity); + + LST_DO_CODE(OB_UNIS_DECODE, pair.part_id_); + if (OB_FAIL(ret)) { + LOG_WARN("fail to decode part_id", K(ret)); + } else if (OB_FAIL(row.deserialize(buf, data_len, pos))) { + LOG_WARN("fail to deserialize row", K(ret)); + } else if (OB_FAIL(ob_write_row(allocator_, row, tmp_row))) { + LOG_WARN("fail to write row", K(ret)); + } else { + pair.list_row_value_ = tmp_row; + pairs_.at(i) = pair; + } + } + } + } + return ret; +} + +int64_t ObPartitionIdRowPairArray::get_serialize_size() const +{ + int64_t len = serialization::encoded_length_vi64(pairs_.count()); + for (int64_t i = 0; i < pairs_.count(); i++) { + const ObPartitionIdRowPair &pair = pairs_.at(i); + len += serialization::encoded_length_vi64(pair.part_id_); + len += pair.list_row_value_.get_serialize_size(); + } + return len; +} + +} // namespace share +} // namespace oceanbase diff --git a/src/share/external_table/ob_partition_id_row_pair.h b/src/share/external_table/ob_partition_id_row_pair.h new file mode 100644 index 0000000000..da6801a8ec --- /dev/null +++ b/src/share/external_table/ob_partition_id_row_pair.h @@ -0,0 +1,62 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#ifndef _OB_PARTITION_ID_ROW_PAIR_H +#define _OB_PARTITION_ID_ROW_PAIR_H + +#include +#include "src/share/schema/ob_column_schema.h" + +namespace oceanbase +{ + +namespace share +{ +struct ObPartitionIdRowPair { + ObPartitionIdRowPair() + : part_id_(common::OB_INVALID_ID), + list_row_value_() {} + + TO_STRING_KV(K_(part_id), K_(list_row_value)); + + int64_t part_id_; + common::ObNewRow list_row_value_; +}; + +class ObPartitionIdRowPairArray +{ +public: + ObPartitionIdRowPairArray(common::ObIAllocator &alloc) + : pairs_(), + allocator_(alloc) {} + + ~ObPartitionIdRowPairArray() {} + + TO_STRING_KV(K_(pairs)); + + int set_part_pair_by_idx(const int64_t idx, ObPartitionIdRowPair &pair); + + int reserve(const int64_t capacity); + int serialize(char *buf, const int64_t buf_len, int64_t &pos) const; + int deserialize(const char *buf, const int64_t data_len, int64_t &pos); + int64_t get_serialize_size() const; + int64_t count() const { return pairs_.count(); } + + const ObPartitionIdRowPair &at(const int64_t idx) const { return pairs_.at(idx); } + +private: + common::ObArrayWrap pairs_; + common::ObIAllocator &allocator_; +}; +} +} +#endif /* _OB_PARTITION_ID_ROW_PAIR_H */ diff --git a/src/sql/das/ob_das_scan_op.cpp b/src/sql/das/ob_das_scan_op.cpp index d31378bdfb..db426b26bb 100644 --- a/src/sql/das/ob_das_scan_op.cpp +++ b/src/sql/das/ob_das_scan_op.cpp @@ -47,7 +47,7 @@ namespace sql { OB_SERIALIZE_MEMBER(ObDASTCBMemProfileKey, fake_unique_id_, timestamp_); -OB_SERIALIZE_MEMBER(ObDASScanCtDef, // FARM COMPAT WHITELIST +OB_SERIALIZE_MEMBER(ObDASScanCtDef, ref_table_id_, access_column_ids_, schema_version_, @@ -71,8 +71,9 @@ OB_SERIALIZE_MEMBER(ObDASScanCtDef, // FARM COMPAT WHITELIST vec_vid_idx_, multivalue_idx_, multivalue_type_, - index_merge_idx_, // FARM COMPAT WHITELIST - flags_); // FARM COMPAT WHITELIST + index_merge_idx_, + flags_, + partition_infos_); OB_DEF_SERIALIZE(ObDASScanRtDef) { diff --git a/src/sql/das/ob_das_scan_op.h b/src/sql/das/ob_das_scan_op.h index e3df9f7b16..bf0dff02b7 100644 --- a/src/sql/das/ob_das_scan_op.h +++ b/src/sql/das/ob_das_scan_op.h @@ -19,6 +19,7 @@ #include "sql/das/ob_group_scan_iter.h" #include "sql/das/iter/ob_das_iter.h" #include "sql/rewrite/ob_query_range.h" +#include "share/external_table/ob_partition_id_row_pair.h" namespace oceanbase { @@ -91,6 +92,7 @@ public: external_file_location_(alloc), external_files_(alloc), external_file_format_str_(alloc), + partition_infos_(alloc), trans_info_expr_(nullptr), ir_scan_type_(ObTSCIRScanType::OB_NOT_A_SPEC_SCAN), rowkey_exprs_(alloc), @@ -161,6 +163,7 @@ public: ObExternalFileFormat::StringData external_file_location_; ExternalFileNameArray external_files_; //for external table scan TODO jim.wjh remove ObExternalFileFormat::StringData external_file_format_str_; + share::ObPartitionIdRowPairArray partition_infos_; ObExpr *trans_info_expr_; // transaction information pseudo-column ObTSCIRScanType ir_scan_type_; // specify retrieval scan type sql::ExprFixedArray rowkey_exprs_; // store rowkey exprs for index lookup