[CP] add serialization size check
This commit is contained in:
		| @ -55,6 +55,11 @@ if(ENABLE_SMART_VAR_CHECK) | ||||
|   set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DENABLE_SMART_VAR_CHECK") | ||||
| endif() | ||||
|  | ||||
| if(ENABLE_SERIALIZATION_CHECK) | ||||
|   set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -DENABLE_SERIALIZATION_CHECK") | ||||
|   set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DENABLE_SERIALIZATION_CHECK") | ||||
| endif() | ||||
|  | ||||
| if(ENABLE_DEBUG_LOG) | ||||
|   set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -DENABLE_DEBUG_LOG") | ||||
|   set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DENABLE_DEBUG_LOG") | ||||
|  | ||||
| @ -18,6 +18,7 @@ ob_define(ENABLE_MEMORY_DIAGNOSIS OFF) | ||||
| ob_define(ENABLE_OBJ_LEAK_CHECK OFF) | ||||
| ob_define(ENABLE_FATAL_ERROR_HANG ON) | ||||
| ob_define(ENABLE_SMART_VAR_CHECK OFF) | ||||
| ob_define(ENABLE_SERIALIZATION_CHECK OFF) | ||||
| ob_define(ENABLE_COMPILE_DLL_MODE OFF) | ||||
| ob_define(OB_CMAKE_RULES_CHECK ON) | ||||
| ob_define(OB_STATIC_LINK_LGPL_DEPS ON) | ||||
|  | ||||
							
								
								
									
										1
									
								
								deps/oblib/src/lib/CMakeLists.txt
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										1
									
								
								deps/oblib/src/lib/CMakeLists.txt
									
									
									
									
										vendored
									
									
								
							| @ -262,6 +262,7 @@ ob_set_subtarget(oblib_lib utility | ||||
|   utility/utility.cpp | ||||
|   utility/ob_backtrace.cpp | ||||
|   utility/ob_proto_trans_util.cpp | ||||
|   utility/ob_unify_serialize.cpp | ||||
| ) | ||||
|  | ||||
| ob_set_subtarget(oblib_lib ash | ||||
|  | ||||
| @ -291,6 +291,11 @@ const char *to_cstring<int64_t>(const int64_t &v) | ||||
|   return to_cstring<int64_t>(v, BoolType<false>()); | ||||
| } | ||||
|  | ||||
| const char *to_cstring(const int64_t v) | ||||
| { | ||||
|   return to_cstring<int64_t>(v, BoolType<false>()); | ||||
| } | ||||
|  | ||||
| //////////////////////////////////////////////////////////////// | ||||
|  | ||||
| int databuff_printf(char *buf, const int64_t buf_len, const char *fmt, ...) | ||||
|  | ||||
							
								
								
									
										2
									
								
								deps/oblib/src/lib/utility/ob_print_utils.h
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										2
									
								
								deps/oblib/src/lib/utility/ob_print_utils.h
									
									
									
									
										vendored
									
									
								
							| @ -272,6 +272,8 @@ const char *to_cstring<const char *>(const char *const &str); | ||||
| template <> | ||||
| const char *to_cstring<int64_t>(const int64_t &v); | ||||
|  | ||||
| const char *to_cstring(const int64_t v); | ||||
|  | ||||
| template <typename T> | ||||
| const char *to_cstring(T *obj) | ||||
| { | ||||
|  | ||||
							
								
								
									
										49
									
								
								deps/oblib/src/lib/utility/ob_unify_serialize.cpp
									
									
									
									
										vendored
									
									
										Normal file
									
								
							
							
						
						
									
										49
									
								
								deps/oblib/src/lib/utility/ob_unify_serialize.cpp
									
									
									
									
										vendored
									
									
										Normal file
									
								
							| @ -0,0 +1,49 @@ | ||||
| /** | ||||
|  * Copyright (c) 2021 OceanBase | ||||
|  * OceanBase CE is licensed under Mulan PubL v2. | ||||
|  * You can use this software according to the terms and conditions of the Mulan PubL v2. | ||||
|  * You may obtain a copy of Mulan PubL v2 at: | ||||
|  *          http://license.coscl.org.cn/MulanPubL-2.0 | ||||
|  * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, | ||||
|  * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, | ||||
|  * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. | ||||
|  * See the Mulan PubL v2 for more details. | ||||
|  */ | ||||
|  | ||||
|  | ||||
| #ifdef ENABLE_SERIALIZATION_CHECK | ||||
| #include "ob_unify_serialize.h" | ||||
| namespace oceanbase | ||||
| { | ||||
| namespace lib | ||||
| { | ||||
| RLOCAL(SerializeDiagnoseRecord, ser_diag_record); | ||||
| void begin_record_serialization() | ||||
| { | ||||
|   ser_diag_record.count = 0; | ||||
|   ser_diag_record.check_index = 0; | ||||
|   ser_diag_record.flag = 1; | ||||
| } | ||||
|  | ||||
| void finish_record_serialization() | ||||
| { | ||||
|   ser_diag_record.flag = 0; | ||||
| } | ||||
|  | ||||
| void begin_check_serialization() | ||||
| { | ||||
|   ser_diag_record.check_index = 0; | ||||
|   if (ser_diag_record.count > 0) { | ||||
|     ser_diag_record.flag = 2; | ||||
|   } | ||||
| } | ||||
|  | ||||
| void finish_check_serialization() | ||||
| { | ||||
|   ser_diag_record.count = -1; | ||||
|   ser_diag_record.check_index = -1; | ||||
|   ser_diag_record.flag = 0; | ||||
| } | ||||
| }  // namespace lib | ||||
| }  // namespace oceanbase | ||||
| #endif | ||||
							
								
								
									
										49
									
								
								deps/oblib/src/lib/utility/ob_unify_serialize.h
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										49
									
								
								deps/oblib/src/lib/utility/ob_unify_serialize.h
									
									
									
									
										vendored
									
									
								
							| @ -15,8 +15,26 @@ | ||||
|  | ||||
| #include "lib/utility/serialization.h" | ||||
|  | ||||
| namespace oceanbase { | ||||
| namespace lib { | ||||
| namespace oceanbase | ||||
| { | ||||
| namespace lib | ||||
| { | ||||
|  | ||||
| #ifdef ENABLE_SERIALIZATION_CHECK | ||||
| static constexpr int MAX_SERIALIZE_RECORD_LENGTH = 256; | ||||
| struct SerializeDiagnoseRecord | ||||
| { | ||||
|   uint8_t encoded_lens[MAX_SERIALIZE_RECORD_LENGTH]; | ||||
|   int count = -1; | ||||
|   int check_index = -1; | ||||
|   int flag = 0; | ||||
| }; | ||||
| RLOCAL_EXTERN(SerializeDiagnoseRecord, ser_diag_record); | ||||
| void begin_record_serialization(); | ||||
| void finish_record_serialization(); | ||||
| void begin_check_serialization(); | ||||
| void finish_check_serialization(); | ||||
| #endif | ||||
|  | ||||
| #define SERIAL_PARAMS char *buf, const int64_t buf_len, int64_t &pos | ||||
| #define DESERIAL_PARAMS const char *buf, const int64_t data_len, int64_t &pos | ||||
| @ -67,8 +85,32 @@ namespace lib { | ||||
|     }                                                                    \ | ||||
|   } | ||||
|  | ||||
| #ifdef ENABLE_SERIALIZATION_CHECK | ||||
| #define OB_UNIS_ADD_LEN(obj)                                                                                        \ | ||||
|   {                                                                                                                 \ | ||||
|     int64_t this_len = NS_::encoded_length(obj);                                                                    \ | ||||
|     if (!(std::is_same<uint8_t, decltype(obj)>::value || std::is_same<int8_t, decltype(obj)>::value ||              \ | ||||
|             std::is_same<bool, decltype(obj)>::value || std::is_same<char, decltype(obj)>::value)) {                \ | ||||
|       if (1 == oceanbase::lib::ser_diag_record.flag &&                                                              \ | ||||
|           oceanbase::lib::ser_diag_record.count < oceanbase::lib::MAX_SERIALIZE_RECORD_LENGTH) {                    \ | ||||
|         oceanbase::lib::ser_diag_record.encoded_lens[oceanbase::lib::ser_diag_record.count++] =                     \ | ||||
|             static_cast<uint8_t>(this_len);                                                                         \ | ||||
|       } else if (2 == oceanbase::lib::ser_diag_record.flag &&                                                       \ | ||||
|                  oceanbase::lib::ser_diag_record.check_index < oceanbase::lib::ser_diag_record.count) {             \ | ||||
|         int ret = OB_ERR_UNEXPECTED;                                                                                \ | ||||
|         int record_len = oceanbase::lib::ser_diag_record.encoded_lens[oceanbase::lib::ser_diag_record.check_index]; \ | ||||
|         if (this_len != record_len) {                                                                               \ | ||||
|           OB_LOG(ERROR, "encoded length not match", "name", MSTR(obj), K(this_len), K(record_len), K(obj));                 \ | ||||
|         }                                                                                                           \ | ||||
|         oceanbase::lib::ser_diag_record.check_index++;                                                              \ | ||||
|       }                                                                                                             \ | ||||
|     }                                                                                                               \ | ||||
|     len += this_len;                                                                                                \ | ||||
|   } | ||||
| #else | ||||
| #define OB_UNIS_ADD_LEN(obj)                                             \ | ||||
|   len += NS_::encoded_length(obj) | ||||
| #endif | ||||
| //----------------------------------------------------------------------- | ||||
|  | ||||
| // serialize_ no header | ||||
| @ -316,6 +358,7 @@ inline uint64_t &get_unis_compat_version() | ||||
| } | ||||
|  | ||||
| #define UNIS_VERSION_GUARD(x) | ||||
| }} // namespace oceanbase::lib | ||||
| }  // namespace lib | ||||
| }  // namespace oceanbase | ||||
|  | ||||
| #endif /* _OCEABASE_LIB_UTILITY_OB_UNIFY_SERIALIZE_H_ */ | ||||
|  | ||||
							
								
								
									
										11
									
								
								deps/oblib/src/rpc/obrpc/ob_rpc_endec.h
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										11
									
								
								deps/oblib/src/rpc/obrpc/ob_rpc_endec.h
									
									
									
									
										vendored
									
									
								
							| @ -47,7 +47,13 @@ template <typename T> | ||||
|   ObRpcPacket pkt; | ||||
|   const int64_t header_sz = pkt.get_header_size(); | ||||
|   int64_t extra_payload_size = calc_extra_payload_size(); | ||||
| #ifdef ENABLE_SERIALIZATION_CHECK | ||||
|   lib::begin_record_serialization(); | ||||
|   int64_t args_len = common::serialization::encoded_length(args); | ||||
|   lib::finish_record_serialization(); | ||||
| #else | ||||
|   int64_t args_len = common::serialization::encoded_length(args); | ||||
| #endif | ||||
|   int64_t payload_sz = extra_payload_size + args_len; | ||||
|   const int64_t reserve_bytes_for_pnio = 0; | ||||
|   char* header_buf = (char*)pool.alloc(reserve_bytes_for_pnio + header_sz + payload_sz) + reserve_bytes_for_pnio; | ||||
| @ -65,6 +71,11 @@ template <typename T> | ||||
|                          payload_buf, payload_sz, pos, args))) { | ||||
|     RPC_OBRPC_LOG(WARN, "serialize argument fail", K(pos), K(payload_sz), K(ret)); | ||||
|   } else if (OB_UNLIKELY(args_len < pos)) { | ||||
| #ifdef ENABLE_SERIALIZATION_CHECK | ||||
|     lib::begin_check_serialization(); | ||||
|     common::serialization::encoded_length(args); | ||||
|     lib::finish_check_serialization(); | ||||
| #endif | ||||
|     ret = OB_ERR_UNEXPECTED; | ||||
|     RPC_OBRPC_LOG(ERROR, "arg encoded length greater than arg length", K(ret), K(payload_sz), | ||||
|                   K(args_len), K(extra_payload_size), K(pos), K(pcode)); | ||||
|  | ||||
| @ -354,6 +354,8 @@ struct ElectionPriorityAdaptivedSerializationBuffer | ||||
|   } | ||||
|   unsigned char *priority_buffer_; | ||||
|   int64_t buffer_used_size_; | ||||
| public: | ||||
|   TO_STRING_KV(K(buffer_used_size_)); | ||||
| }; | ||||
| class ElectionAcceptResponseMsgMiddle : public ElectionMsgBase | ||||
| { | ||||
|  | ||||
| @ -863,6 +863,7 @@ public: | ||||
|   const common::ObIArray<common::ObString>& get_select_columns() const { return properties_names_; }; | ||||
|   static int64_t get_max_packet_buffer_length() { return obrpc::get_max_rpc_packet_size() - (1<<20); } | ||||
|   static int64_t get_max_buf_block_size() { return get_max_packet_buffer_length() - (1024*1024LL); } | ||||
|   TO_STRING_KV(K(properties_names_), K(row_count_), K(buf_.get_position())); | ||||
| private: | ||||
|   static const int64_t DEFAULT_BUF_BLOCK_SIZE = common::OB_MALLOC_BIG_BLOCK_SIZE - (1024*1024LL); | ||||
|   int alloc_buf_if_need(const int64_t size); | ||||
|  | ||||
| @ -331,6 +331,7 @@ public: | ||||
|   } | ||||
|   int save_success_task(const common::ObTabletID &succ_id) | ||||
|   { return succ_tablet_list_.push_back(succ_id); } | ||||
|   TO_STRING_KV(K(all_tablet_list_)); | ||||
| private: | ||||
|   int get_vt_svr_pair(uint64_t vt_id, const VirtualSvrPair *&vt_svr_pair); | ||||
|   int get_vt_tablet_loc(uint64_t table_id, | ||||
|  | ||||
| @ -415,6 +415,7 @@ public: | ||||
|                                          ObPushdownFilterNode *&pd_storage_filter); | ||||
|   static int64_t get_serialize_pushdown_filter_size(ObPushdownFilterNode *pd_filter_node); | ||||
|   // NEED_SERIALIZE_AND_DESERIALIZE; | ||||
|   TO_STRING_KV(KP(filter_tree_)); | ||||
| private: | ||||
|   common::ObIAllocator &alloc_; | ||||
|   ObPushdownFilterNode *filter_tree_; | ||||
|  | ||||
| @ -236,6 +236,7 @@ public: | ||||
|     } | ||||
|     return ret; | ||||
|   } | ||||
|   TO_STRING_KV(KP(ptr_value_)); | ||||
|  | ||||
| private: | ||||
|   union { | ||||
|  | ||||
| @ -1235,6 +1235,7 @@ struct ObSerCArray | ||||
|   ObSerCArray(T &data, U &cnt) : data_(data), cnt_(cnt) {} | ||||
|   T &data_; | ||||
|   U &cnt_; | ||||
|   TO_STRING_KV(K(cnt_)); | ||||
| }; | ||||
|  | ||||
| template <typename T, typename U> | ||||
|  | ||||
| @ -103,6 +103,7 @@ public: | ||||
|   common::ObFixedArray<ObExprResType, common::ObIAllocator> udf_attributes_types_; | ||||
|   // indicate the argument is const expr or not | ||||
|   common::ObFixedArray<bool, common::ObIAllocator> args_const_attr_; | ||||
|   TO_STRING_KV(K(udf_meta_), K(udf_attributes_), K(udf_attributes_types_), K(args_const_attr_)); | ||||
| }; | ||||
|  | ||||
| OB_DEF_SERIALIZE(ObDllUdfInfo<UF>, template<typename UF>) | ||||
|  | ||||
| @ -40,6 +40,7 @@ public: | ||||
|                         ObIExprExtraInfo *&copied_info) const = 0; | ||||
| public: | ||||
|   ObExprOperatorType type_; | ||||
|   TO_STRING_KV(K(type_)); | ||||
| }; | ||||
|  | ||||
| } // end namespace sql | ||||
|  | ||||
| @ -47,6 +47,8 @@ struct ObJoinFilterShareInfo | ||||
|   uint64_t filter_ptr_;   //此指针将作为PX JOIN FILTER CREATE算子共享内存. | ||||
|   uint64_t shared_msgs_;  //sqc-shared dh msgs | ||||
|   OB_UNIS_VERSION_V(1); | ||||
| public: | ||||
|   TO_STRING_KV(KP(unfinished_count_ptr_), KP(ch_provider_ptr_), KP(release_ref_ptr_), KP(filter_ptr_), K(shared_msgs_)); | ||||
| }; | ||||
|  | ||||
| struct ObJoinFilterRuntimeConfig | ||||
|  | ||||
| @ -681,6 +681,7 @@ public: | ||||
|  | ||||
|   ObPhyOperatorType get_type() const { return spec_.type_; } | ||||
|   const ObOpSpec &get_spec() const { return spec_; } | ||||
|   TO_STRING_KV(K(spec_)); | ||||
| protected: | ||||
|   ObExecContext &exec_ctx_; | ||||
|   const ObOpSpec &spec_; | ||||
|  | ||||
| @ -44,6 +44,7 @@ public: | ||||
|     bool is_inited() const { return PHY_INVALID != op_type_;  } | ||||
|     ObPhyOperatorType op_type_; | ||||
|     int64_t op_id_; | ||||
|     TO_STRING_KV(K(op_type_), K(op_id_)); | ||||
|   }; | ||||
| public: | ||||
|   virtual int inner_open() override; | ||||
|  | ||||
| @ -224,6 +224,9 @@ public: | ||||
|  | ||||
|  | ||||
|   DISALLOW_COPY_AND_ASSIGN(ObTaskExecutorCtx); | ||||
|   TO_STRING_KV(K(table_locations_), K(retry_times_), K(min_cluster_version_), K(expected_worker_cnt_), | ||||
|       K(admited_worker_cnt_), K(query_tenant_begin_schema_version_), K(query_sys_begin_schema_version_), | ||||
|       K(minimal_worker_cnt_)); | ||||
| }; | ||||
|  | ||||
| class ObExecutorRpcImpl; | ||||
|  | ||||
| @ -638,6 +638,7 @@ public: | ||||
|       uint32_t reserved_ : 29; | ||||
|     }; | ||||
|   }; | ||||
|   TO_STRING_KV(K(stmt_type_)); | ||||
| private: | ||||
|   share::ObFeedbackRerouteInfo *reroute_info_; | ||||
| }; | ||||
|  | ||||
| @ -68,6 +68,7 @@ public: | ||||
|   int erase_refactored(const common::ObString &key, ObSessionVariable *sess_var = NULL); | ||||
|   int64_t size() const {return map_.size();} | ||||
|   NEED_SERIALIZE_AND_DESERIALIZE; | ||||
|   TO_STRING_KV(K(size())); | ||||
| private: | ||||
|   int free_mem(); | ||||
|  | ||||
|  | ||||
| @ -509,6 +509,8 @@ struct ObInnerContextMap { | ||||
|   ObInnerContextHashMap *context_map_; | ||||
|   common::ObIAllocator &alloc_; | ||||
|   OB_UNIS_VERSION(1); | ||||
| public: | ||||
|   TO_STRING_KV(K(context_name_), K(context_map_->size())); | ||||
| }; | ||||
| typedef common::hash::ObHashMap<common::ObString, ObInnerContextMap *, | ||||
|                                 common::hash::NoPthreadDefendMode, | ||||
|  | ||||
		Reference in New Issue
	
	Block a user
	 zhjc1124
					zhjc1124