 0d0b11c0cc
			
		
	
	0d0b11c0cc
	
	
	
		
			
			Co-authored-by: oceanoverflow <oceanoverflow@gmail.com> Co-authored-by: hezuojiao <hezuojiao@gmail.com> Co-authored-by: Monk-Liu <1152761042@qq.com>
		
			
				
	
	
		
			299 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			299 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /**
 | |
|  * Copyright (c) 2021 OceanBase
 | |
|  * OceanBase CE is licensed under Mulan PubL v2.
 | |
|  * You can use this software according to the terms and conditions of the Mulan PubL v2.
 | |
|  * You may obtain a copy of Mulan PubL v2 at:
 | |
|  *          http://license.coscl.org.cn/MulanPubL-2.0
 | |
|  * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 | |
|  * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 | |
|  * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 | |
|  * See the Mulan PubL v2 for more details.
 | |
|  */
 | |
| #define USING_LOG_PREFIX COMMON
 | |
| #include "deps/oblib/src/common/object/ob_obj_type.h"
 | |
| #include "test_op_engine.h"
 | |
| #include "ob_fake_table_scan_vec_op.h"
 | |
| #include "data_generator.h"
 | |
| 
 | |
| namespace oceanbase
 | |
| {
 | |
| namespace sql
 | |
| {
 | |
| int ObFakeTableScanVecOp::inner_open()
 | |
| {
 | |
|   ObDataGenerator::get_instance().register_op(this);
 | |
|   std::string round;
 | |
|   ObTestOpConfig::get_instance().get_config("round", round);
 | |
|   if (!round.empty()) { max_round_ = current_round_ + std::stoi(round); }
 | |
| 
 | |
|   return OB_SUCCESS;
 | |
| }
 | |
| 
 | |
| int ObFakeTableScanVecOp::inner_get_next_batch(const int64_t max_row_cnt)
 | |
| {
 | |
|   int ret = OB_SUCCESS;
 | |
|   clear_evaluated_flag();
 | |
|   uint64_t op_id = get_spec().get_id();
 | |
|   int64_t generate_random_value = 0;
 | |
|   bool is_duplicate = false;
 | |
| 
 | |
|   const ObPushdownExprSpec &pd_expr_spec =
 | |
|     reinterpret_cast<const ObTableScanSpec *>(&spec_)->tsc_ctdef_.scan_ctdef_.pd_expr_spec_;
 | |
|   for (int j = 0; j < pd_expr_spec.access_exprs_.count(); j++) {
 | |
|     ObExpr *expr = pd_expr_spec.access_exprs_.at(j);
 | |
|     OB_FAIL(fill_random_data_into_expr_datum_frame(j, pd_expr_spec.access_exprs_.count(), expr,
 | |
|                                                    ObTestOpConfig::get_instance().batch_size_, is_duplicate));
 | |
|   }
 | |
| 
 | |
|   // random set skip
 | |
|   set_random_skip(current_round_, ObTestOpConfig::get_instance().batch_size_);
 | |
| 
 | |
|   if (is_duplicate) { ObDataGenerator::get_instance().reset_temp_store(op_id, current_round_); }
 | |
| 
 | |
|   current_round_++;
 | |
|   brs_.size_ = ObTestOpConfig::get_instance().batch_size_;
 | |
|   if (current_round_ == max_round_) { brs_.end_ = true; }
 | |
| 
 | |
|   // print generate data
 | |
|   LOG_INFO("[DG] data generated by DataGenerator in ", K(current_round_ - 1));
 | |
|   if (op_id_2_output_streams_.count(op_id) == 0) {
 | |
|     std::string output_file_name = "generate_data_" + std::to_string(op_id) + ".data";
 | |
|     op_id_2_output_streams_[op_id].open(output_file_name.data(), std::ios::out | std::ios::trunc);
 | |
|   }
 | |
|   test::TestOpEngine::print_to_file(&brs_, this, pd_expr_spec.access_exprs_, false, &op_id_2_output_streams_[op_id]);
 | |
| 
 | |
|   return ret;
 | |
| }
 | |
| 
 | |
| int ObFakeTableScanVecOp::fill_random_data_into_expr_datum_frame(int expr_i, int expr_count, const ObExpr *expr,
 | |
|                                                                  const int output_row_count, bool &is_duplicate)
 | |
| {
 | |
|   int ret = OB_SUCCESS;
 | |
|   LOG_DEBUG("generate_random_value for expr:  ", K(*expr));
 | |
| 
 | |
|   ObIVector *i_vec = NULL;
 | |
|   ObDatum *datums = NULL;
 | |
| 
 | |
|   if (expr->enable_rich_format()) {
 | |
|     // vectorization 2.0 new operator
 | |
|     i_vec = expr->get_vector(eval_ctx_);
 | |
|     if (expr->is_fixed_length_data_) {
 | |
|       // for VEC_FIXED and VEC_UNIFORM
 | |
|       // we must first call init_vector because we need to use i_vec->set_xxx() which need use vector's meta data
 | |
|       OB_ASSERT(ObTestOpConfig::get_instance().digit_data_format_ == VEC_UNIFORM
 | |
|                 || ObTestOpConfig::get_instance().digit_data_format_ == VEC_FIXED);
 | |
|       expr->init_vector(eval_ctx_, ObTestOpConfig::get_instance().digit_data_format_, output_row_count, true);
 | |
|     } else {
 | |
|       // while for VEC_DISCRETE and VEC_CONTINUOUS it doesn't matter
 | |
|       OB_ASSERT(ObTestOpConfig::get_instance().string_data_format_ == VEC_UNIFORM
 | |
|                 || ObTestOpConfig::get_instance().string_data_format_ == VEC_DISCRETE
 | |
|                 || ObTestOpConfig::get_instance().string_data_format_ == VEC_CONTINUOUS);
 | |
|       expr->init_vector(eval_ctx_, ObTestOpConfig::get_instance().string_data_format_, output_row_count, true);
 | |
|     }
 | |
|   } else {
 | |
|     // vectorization 1.0 old operator
 | |
|     datums = expr->locate_datums_for_update(eval_ctx_, output_row_count);
 | |
|   }
 | |
| 
 | |
|   // generate random data
 | |
|   get_random_data(expr_i, expr_count, expr, current_round_, output_row_count, expr->max_length_, is_duplicate);
 | |
| 
 | |
|   int vec_continuous_offset = 0;   // only used in VEC_CONTINUOUS
 | |
|   std::string vec_continuous_data; // use to store data of VEC_CONTINUOUS temporarily
 | |
| 
 | |
|   ObDataGenerator::TempDataStore &data_store =
 | |
|     ObDataGenerator::get_instance().op_2_round_2_temp_store_[get_spec().get_id()][current_round_][expr_i];
 | |
|   for (int row = 0; row < output_row_count; row++) {
 | |
|     bool is_null = data_store.null_[row];
 | |
| 
 | |
|     switch (expr->datum_meta_.get_type()) {
 | |
|     // why expr->datum_meta_.get_type() == ObInt32Type while expr->res_buf_len_ == 8 ?????
 | |
|     case ObInt32Type: {
 | |
|       int data = data_store.temp_int32_vector_[row];
 | |
|       if (i_vec != NULL) {
 | |
|         i_vec->set_int(row, static_cast<int64_t>(data));
 | |
|         if (is_null) { i_vec->set_null(row); }
 | |
|       } else {
 | |
|         datums[row].set_int32(data);
 | |
|         if (is_null) { datums[row].set_null(); }
 | |
|       }
 | |
|       break;
 | |
|     }
 | |
|     case ObIntType: {
 | |
|       int64_t data = data_store.temp_int64_vector_[row];
 | |
|       if (i_vec != NULL) {
 | |
|         i_vec->set_int(row, data);
 | |
|         if (is_null) { i_vec->set_null(row); }
 | |
|       } else {
 | |
|         datums[row].set_int(data);
 | |
|         if (is_null) { datums[row].set_null(); }
 | |
|       }
 | |
|       break;
 | |
|     }
 | |
|     case ObDoubleType: {
 | |
|       double data = data_store.temp_double_vector_[row];
 | |
|       if (i_vec != NULL) {
 | |
|         i_vec->set_double(row, data);
 | |
|         if (is_null) { i_vec->set_null(row); }
 | |
|       } else {
 | |
|         datums[row].set_double(data);
 | |
|         if (is_null) { datums[row].set_null(); }
 | |
|       }
 | |
|       break;
 | |
|     }
 | |
|     case ObNumberType: {
 | |
|       break;
 | |
|     }
 | |
|     case ObDecimalIntType: {
 | |
|       // have bug
 | |
|       // if decimal(20, 10)
 | |
|       // expr->res_buf_len_ = 16 is *not* equal to datums[x].len_ == 8
 | |
|       // where datums[x].len_ is set?
 | |
|       // i_vec->set_int(row,
 | |
|       //                ObDataGenerator::get_instance().round_2_temp_store_[round_].first[expr_i].temp_int64_vector_[row]);
 | |
|       break;
 | |
|     }
 | |
|     case ObVarcharType:
 | |
|     case ObCharType: {
 | |
|       std::string tmp_str = data_store.temp_string_vector_[row];
 | |
|       int str_len = static_cast<int>(tmp_str.size());
 | |
| 
 | |
|       if (i_vec != NULL) {
 | |
|         // vectorization 1.0 new operator
 | |
|         if (expr->get_format(eval_ctx_) == VEC_UNIFORM) {
 | |
|           if (is_null) {
 | |
|             i_vec->set_null(row);
 | |
|           } else {
 | |
|             i_vec->set_payload(row, tmp_str.data(), str_len);
 | |
|           }
 | |
|         } else {
 | |
|           if (is_null) { i_vec->set_null(row); }
 | |
|           if (expr->get_format(eval_ctx_) == VEC_DISCRETE) {
 | |
|             char **ptrs = expr->get_discrete_vector_ptrs(eval_ctx_);
 | |
|             int32_t *lens = expr->get_discrete_vector_lens(eval_ctx_);
 | |
|             ptrs[row] = expr->get_res_buf(eval_ctx_) + row * (expr->res_buf_len_);
 | |
|             lens[row] = str_len;
 | |
| 
 | |
|             i_vec->set_payload(row, tmp_str.data(), str_len);
 | |
|           } else {
 | |
|             // VEC_CONTINUOUS
 | |
|             // offset[0] == 0 which is set in cg phase
 | |
|             vec_continuous_offset += str_len;
 | |
|             uint32_t *offset = expr->get_continuous_vector_offsets(eval_ctx_);
 | |
|             offset[row + 1] = vec_continuous_offset;
 | |
|             vec_continuous_data += tmp_str; // temporarily store data here
 | |
|           }
 | |
|         }
 | |
|       } else {
 | |
|         // vectorization 1.0 old operator
 | |
|         if (is_null) {
 | |
|           datums[row].set_null();
 | |
|         } else {
 | |
|           datums[row].set_string(tmp_str.data(), str_len);
 | |
|         }
 | |
|       }
 | |
| 
 | |
|       break;
 | |
|     }
 | |
|     default: LOG_INFO("Can not generate random value so far for: ", K(expr->datum_meta_.get_type()));
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   if (expr->enable_rich_format() && expr->get_format(eval_ctx_) == VEC_CONTINUOUS && !vec_continuous_data.empty()) {
 | |
|     ObDynReserveBuf *drb = reinterpret_cast<ObDynReserveBuf *>(expr->get_continuous_vector_data(eval_ctx_));
 | |
|     // Todo: replace below logic when shengle handle continuous format memory
 | |
|     // in ObExpr::get_str_res_mem()
 | |
|     char *mem = NULL;
 | |
|     const int64_t alloc_size = next_pow2(vec_continuous_data.size());
 | |
|     if (OB_UNLIKELY(alloc_size > UINT32_MAX)) {
 | |
|       ret = OB_INVALID_ARGUMENT;
 | |
|       LOG_WARN("invalid argument", K(vec_continuous_data.size()), K(alloc_size), K(ret));
 | |
|     } else if (OB_ISNULL(mem = static_cast<char *>(eval_ctx_.alloc_expr_res(alloc_size)))) {
 | |
|       ret = OB_ALLOCATE_MEMORY_FAILED;
 | |
|       LOG_WARN("allocate memory failed", K(ret), K(ret));
 | |
|     } else {
 | |
|       // When extend memory, the old memory can not free, because the old memory may
 | |
|       // still be referenced. see: ob_datum_cast.cpp::common_copy_string
 | |
|       if (0 == drb->len_) { drb->magic_ = ObDynReserveBuf::MAGIC_NUM; }
 | |
|       drb->len_ = alloc_size;
 | |
|       drb->mem_ = mem;
 | |
|       MEMCPY(drb->mem_, vec_continuous_data.data(), vec_continuous_data.size());
 | |
| 
 | |
|       ObContinuousBase *cont_vec = static_cast<ObContinuousBase *>(expr->get_vector(eval_ctx_));
 | |
|       cont_vec->set_data(drb->mem_);
 | |
|     }
 | |
|     LOG_DEBUG("extend expr result memory", K(ret), K(vec_continuous_data.size()), K(alloc_size), KP(this), KP(mem));
 | |
|   }
 | |
| 
 | |
|   return ret;
 | |
| }
 | |
| 
 | |
| int ObFakeTableScanVecOp::get_random_data(int expr_i, int expr_count, const ObExpr *expr, const int round,
 | |
|                                           const int batch_size, const int len, bool &is_duplicate)
 | |
| {
 | |
|   int ret = OB_SUCCESS;
 | |
|   uint64_t op_id = get_spec().get_id();
 | |
|   switch (expr->datum_meta_.get_type()) {
 | |
|   // why expr->datum_meta_.get_type() == ObInt32Type while expr->res_buf_len_ == 8 ?????
 | |
|   case ObInt32Type: {
 | |
|     ObDataGenerator::get_instance().generate_data<int>(op_id, expr_i, expr_count, round, batch_size, len, is_duplicate);
 | |
|     break;
 | |
|   }
 | |
|   case ObIntType: {
 | |
|     ObDataGenerator::get_instance().generate_data<int64_t>(op_id, expr_i, expr_count, round, batch_size, len,
 | |
|                                                            is_duplicate);
 | |
|     break;
 | |
|   }
 | |
|   case ObDoubleType: {
 | |
|     ObDataGenerator::get_instance().generate_data<double>(op_id, expr_i, expr_count, round, batch_size, len,
 | |
|                                                           is_duplicate);
 | |
|     break;
 | |
|   }
 | |
|   case ObNumberType: {
 | |
|     break;
 | |
|   }
 | |
|   case ObDecimalIntType: {
 | |
|     // have bug
 | |
|     // if decimal(20, 10)
 | |
|     // expr->res_buf_len_ = 16 is *not* equal to datums[x].len_ == 8
 | |
|     // where datums[x].len_ is set?
 | |
|     ObDataGenerator::get_instance().generate_data<int64_t>(op_id, expr_i, expr_count, round, batch_size, len,
 | |
|                                                            is_duplicate);
 | |
|     break;
 | |
|   }
 | |
|   case ObVarcharType:
 | |
|   case ObCharType: {
 | |
|     ObDataGenerator::get_instance().generate_data<std::string>(op_id, expr_i, expr_count, round, batch_size, len,
 | |
|                                                                is_duplicate);
 | |
|     break;
 | |
|   }
 | |
|   default: LOG_INFO("Can not generate random value so far for: ", K(expr->datum_meta_.get_type()));
 | |
|   }
 | |
| 
 | |
|   return ret;
 | |
| }
 | |
| 
 | |
| void ObFakeTableScanVecOp::set_random_skip(const int round, const int batch_size)
 | |
| {
 | |
|   std::string generate_data_skips;
 | |
|   uint64_t op_id = get_spec().get_id();
 | |
|   if (ObDataGenerator::get_instance().op_2_round_2_skips_[op_id].count(round) == 0) {
 | |
|     std::uniform_int_distribution<int> u_i(0, 1);
 | |
|     bool if_skip;
 | |
|     for (int i = 0; i < batch_size; i++) {
 | |
|       if_skip =
 | |
|         ObDataGenerator::get_instance().zero_one_rand_by_probability(ObTestOpConfig::get_instance().skips_probability_);
 | |
|       generate_data_skips += std::to_string(!if_skip) + " ";
 | |
|       ObDataGenerator::get_instance().op_2_round_2_skips_[op_id][round].push_back(if_skip);
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   for (int i = 0; i < batch_size; i++) {
 | |
|     if (ObDataGenerator::get_instance().op_2_round_2_skips_[op_id][round][i] == true) { brs_.skip_->set(i); }
 | |
|   }
 | |
|   LOG_INFO("skips : ", K(generate_data_skips.data()));
 | |
|   return;
 | |
| }
 | |
| 
 | |
| } // namespace sql
 | |
| } // namespace oceanbase
 |