/** * Copyright (c) 2021 OceanBase * OceanBase CE is licensed under Mulan PubL v2. * You can use this software according to the terms and conditions of the Mulan PubL v2. * You may obtain a copy of Mulan PubL v2 at: * http://license.coscl.org.cn/MulanPubL-2.0 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. * See the Mulan PubL v2 for more details. */ #include "share/ob_define.h" #include "lib/container/ob_se_array.h" #include "lib/container/ob_bit_set.h" #include "lib/string/ob_sql_string.h" #include "lib/hash/ob_hashmap.h" #include "common/object/ob_object.h" #ifndef OCEANBASE_SQL_ENGINE_CMD_LOAD_DATA_UTILS_H_ #define OCEANBASE_SQL_ENGINE_CMD_LOAD_DATA_UTILS_H_ namespace oceanbase { namespace sql { enum class ObLoadTaskResultFlag; enum class ObLoadDupActionType; class ObSQLSessionInfo; typedef common::ObBitSet ObExprValueBitSet; /* A state machine to handle backslash from a char stream */ class ObLoadEscapeSM { public: static const int64_t ESCAPE_CHAR_MYSQL = static_cast('\\'); static const int64_t ESCAPE_CHAR_ORACLE = static_cast('\''); ObLoadEscapeSM() : is_escaped_flag_(false), escape_char_(INT64_MAX), escaped_char_count(0) {} OB_INLINE void shift_by_input(char c) { /* there are 4 situations: * 1. STATE : c == \\ && is_escaped_flag_ == True Action: is_escaped_flag_ = False * 2. STATE : c == \\ && is_escaped_flag_ == False Action: is_escaped_flag_ = True * 3. STATE : c != \\ && is_escaped_flag_ == True Action: is_escaped_flag_ = False * 4. STATE : c != \\ && is_escaped_flag_ == False Action: is_escaped_flag_ = False * only state 1-3 need to change is_escaped_flag_, but usual case is state 4 */ if (OB_LIKELY(static_cast(c) != escape_char_ && !is_escaped_flag_)) { //situation 4, do nothing } else { //situation 1-3 is_escaped_flag_ = !is_escaped_flag_; if (is_escaped_flag_) { escaped_char_count++; } } } OB_INLINE bool is_escaping() { return is_escaped_flag_; } void set_escape_char(int64_t escape_char) { escape_char_ = escape_char; } void reset() { is_escaped_flag_ = false; escaped_char_count = 0; } int64_t get_escaped_char_count() { return escaped_char_count; } private: bool is_escaped_flag_; int64_t escape_char_; int64_t escaped_char_count; }; class ObLoadDataUtils { public: static const char *NULL_STRING; static const char NULL_VALUE_FLAG; static inline void remove_last_slash(common::ObString &value) { const char *data = value.ptr(); int32_t data_len = value.length(); if (OB_LIKELY(data_len) > 0 && OB_UNLIKELY(data[data_len - 1] == '\\')) { bool is_escaped = false; for (int32_t i = data_len - 2; i >= 0 && data[i] == '\\'; --i) { is_escaped = !is_escaped; } if (!is_escaped) { value.assign_ptr(data, data_len - 1); } } } static inline int str_write_buf(const common::ObString &str, char *&buf, int64_t &buf_len) { int ret = common::OB_SUCCESS; int64_t data_len = str.length(); if (OB_UNLIKELY(buf_len <= data_len)) { ret = common::OB_SIZE_OVERFLOW; } else { MEMCPY(buf, str.ptr(), data_len); buf += data_len; buf_len -= data_len; } return ret; } static inline int char_write_buf(char c, char *&buf, int64_t &buf_len) { int ret = common::OB_SUCCESS; if (OB_UNLIKELY(buf_len <= 1)) { ret = common::OB_SIZE_OVERFLOW; } else { buf[0] = c; buf++; buf_len--; } return ret; } static inline int escape_str_write_buf(const common::ObHexEscapeSqlStr &str, char *&buf, int64_t &buf_len) { int ret = common::OB_SUCCESS; int64_t data_len = str.to_string(buf, buf_len); if (OB_UNLIKELY(buf_len <= data_len)) { ret = common::OB_SIZE_OVERFLOW; } else { buf += data_len; buf_len -= data_len; } return ret; } static inline bool is_null_field(const common::ObString &field_str) { int ret_bool = false; if (field_str.length() == 1 && *field_str.ptr() == NULL_VALUE_FLAG) { ret_bool = true; } return ret_bool; } static inline bool is_zero_field(const common::ObString &field_str) { int ret_bool = false; if (field_str.length() == 2 && field_str.ptr()[0] == '\xff' && field_str.ptr()[1] == '\xff') { ret_bool = true; } return ret_bool; } static common::ObString escape_quotation(const common::ObString &value, common::ObDataBuffer &data_buf); static int init_empty_string_array(common::ObIArray &new_array, int64_t array_size); static int build_insert_sql_string_head(ObLoadDupActionType insert_mode, const common::ObString &table_name, const common::ObIArray &insert_keys, common::ObSqlString &insertsql_keys); static int append_values_in_remote_process(int64_t table_column_count, int64_t append_values_count, const ObExprValueBitSet &expr_bitset, const common::ObIArray &insert_values, common::ObSqlString &insertsql, common::ObDataBuffer &data_buffer, int64_t skipped_row_count = 0); static int append_values_for_one_row(const int64_t table_column_count, const ObExprValueBitSet &expr_value_bitset, const common::ObIArray &insert_values, common::ObSqlString &insertsql, common::ObDataBuffer &data_buffer, const int64_t skipped_row_count = 0); static int append_value(const common::ObString &cur_column_str, common::ObSqlString &sqlstr_values, bool is_expr_value); static int append_values_in_local_process(const int64_t key_columns, const int64_t values_count, const common::ObIArray &insert_values, const ObExprValueBitSet &expr_value_bitset, common::ObSqlString &insertsql, common::ObDataBuffer &data_buffer); static inline bool has_flag(int64_t &task_status, int64_t flag) { return 0 != (task_status & (1<(ResFlag::INVALID_MAX_FLAG) < 64, "ObLoadTaskResultFlag max value should less than bit size of int64_t"); OB_INLINE void set_flag(ResFlag flag) { task_status_ |= (1 << static_cast(flag)); } OB_INLINE bool has_flag(ResFlag flag) { return 0 != (task_status_ & (1 << static_cast(flag))); } TO_STRING_KV(K_(task_status)); OB_UNIS_VERSION(1); private: int64_t task_status_; }; class ObLoadDataTimer { public: ObLoadDataTimer(): total_time_us_(0), temp_start_time_us_(-1) {} OB_INLINE void start_stat() { UNUSED(temp_start_time_us_); #ifdef TIME_STAT_ON temp_start_time_us_ = ObTimeUtility::current_time(); #endif } OB_INLINE void end_stat() { #ifdef TIME_STAT_ON if (temp_start_time_us_ != -1) { total_time_us_ += (ObTimeUtility::current_time() - temp_start_time_us_); temp_start_time_us_ = -1; } #endif } int64_t get_wait_secs() const { return total_time_us_/1000000; } TO_STRING_KV("secs", get_wait_secs()); private: int64_t total_time_us_; int64_t temp_start_time_us_; }; /* * ObKMPStateMachine is a str matcher * efficently implimented using KMP algorithem * to detect a given str from a char stream */ class ObKMPStateMachine { public: ObKMPStateMachine(): is_inited_(false), str_(NULL), str_len_(0), matched_pos_(0), next_(NULL) {} int init(common::ObIAllocator &allocator, const common::ObString &str); /* * accept one char at a time, and update the state of the detector * return true if succ matching target str ending with the current char */ OB_INLINE bool accept_char(const char c) { bool ret_bool = false; if (OB_UNLIKELY(!is_inited_)) { SQL_ENG_LOG(ERROR, "ObKmpSeparatorDetector not inited."); } else { while (matched_pos_ > 0 && c != str_[matched_pos_]) { matched_pos_ = next_[matched_pos_]; } if (c == str_[matched_pos_]) { matched_pos_++; } if (matched_pos_ == str_len_) { matched_pos_ = 0; ret_bool = true; } } return ret_bool; } OB_INLINE int32_t get_pattern_length() { return str_len_; } bool scan_buf(char *&cur_pos, const char* buf_end); void reuse() { matched_pos_ = 0; } private: static const int KEY_WORD_MAX_LENGTH = 2 * 1024; bool is_inited_; char* str_; //string pattern for matching int32_t str_len_; int32_t matched_pos_; //can opt to pointer int32_t* next_; //next array of KMP algorithm }; struct ObLoadDataGID { static volatile int64_t GlobalLoadDataID; static void generate_new_id(ObLoadDataGID &gid) { gid.id = ATOMIC_AAF(&GlobalLoadDataID, 1); } ObLoadDataGID() : id(-1) {} bool is_valid() const { return id > 0; } uint64_t hash() const { return common::murmurhash(&id, sizeof(id), 0); } bool operator==(const ObLoadDataGID &other) const { return id == other.id; } void operator=(const ObLoadDataGID &other) { id = other.id; } int64_t id; TO_STRING_KV(K(id)); OB_UNIS_VERSION(1); }; struct ObLoadDataStat { ObLoadDataStat() : ref_cnt_(0), tenant_id_(0), job_id_(0), table_name_(), file_path_(), table_column_(0), file_column_(0), batch_size_(0), parallel_(1), load_mode_(0), start_time_(0), estimated_remaining_time_(0), total_bytes_(0), read_bytes_(0), processed_bytes_(0), processed_rows_(0), total_shuffle_task_(0), total_insert_task_(0), shuffle_rt_sum_(0), insert_rt_sum_(0), total_wait_secs_(0) {} int64_t aquire() { return ATOMIC_AAF(&ref_cnt_, 1); } int64_t release() { return ATOMIC_AAF(&ref_cnt_, -1); } int64_t get_ref_cnt() { return ATOMIC_LOAD(&ref_cnt_); } volatile int64_t ref_cnt_; int64_t tenant_id_; int64_t job_id_; common::ObString table_name_; common::ObString file_path_; int64_t table_column_; int64_t file_column_; int64_t batch_size_; int64_t parallel_; int64_t load_mode_; int64_t start_time_; int64_t estimated_remaining_time_; int64_t total_bytes_; int64_t read_bytes_; //bytes read to memory int64_t processed_bytes_; int64_t processed_rows_; int64_t total_shuffle_task_; int64_t total_insert_task_; int64_t shuffle_rt_sum_; int64_t insert_rt_sum_; int64_t total_wait_secs_; TO_STRING_KV(K(job_id_), K(table_name_), K(total_bytes_), K(processed_bytes_)); }; class ObGetAllJobStatusOp { public: ObGetAllJobStatusOp(); ~ObGetAllJobStatusOp(); public: void reset(); int operator()(common::hash::HashMapPair &entry); ObLoadDataStat* next_job_status(void); bool end(void); private: common::ObSEArray job_status_array_; int32_t current_job_index_; }; class ObGlobalLoadDataStatMap { public: static ObGlobalLoadDataStatMap *getInstance(); ObGlobalLoadDataStatMap() : is_inited_(false) {} int init(); int register_job(const ObLoadDataGID &id, ObLoadDataStat *job_status); int unregister_job(const ObLoadDataGID &id, ObLoadDataStat *&job_status); int get_job_status(const ObLoadDataGID &id, ObLoadDataStat *&job_status); int get_all_job_status(ObGetAllJobStatusOp &job_status_op); private: typedef common::hash::ObHashMap HASH_MAP; static const int64_t bucket_num = 1000; static ObGlobalLoadDataStatMap *instance_; HASH_MAP map_; bool is_inited_; }; } } #endif // OCEANBASE_SQL_ENGINE_CMD_LOAD_DATA_UTILS_H_