Co-authored-by: oceanoverflow <oceanoverflow@gmail.com> Co-authored-by: hezuojiao <hezuojiao@gmail.com> Co-authored-by: Monk-Liu <1152761042@qq.com>
		
			
				
	
	
		
			299 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			299 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
/**
 | 
						|
 * Copyright (c) 2021 OceanBase
 | 
						|
 * OceanBase CE is licensed under Mulan PubL v2.
 | 
						|
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
 | 
						|
 * You may obtain a copy of Mulan PubL v2 at:
 | 
						|
 *          http://license.coscl.org.cn/MulanPubL-2.0
 | 
						|
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 | 
						|
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 | 
						|
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 | 
						|
 * See the Mulan PubL v2 for more details.
 | 
						|
 */
 | 
						|
#define USING_LOG_PREFIX COMMON
 | 
						|
#include "deps/oblib/src/common/object/ob_obj_type.h"
 | 
						|
#include "test_op_engine.h"
 | 
						|
#include "ob_fake_table_scan_vec_op.h"
 | 
						|
#include "data_generator.h"
 | 
						|
 | 
						|
namespace oceanbase
 | 
						|
{
 | 
						|
namespace sql
 | 
						|
{
 | 
						|
int ObFakeTableScanVecOp::inner_open()
 | 
						|
{
 | 
						|
  ObDataGenerator::get_instance().register_op(this);
 | 
						|
  std::string round;
 | 
						|
  ObTestOpConfig::get_instance().get_config("round", round);
 | 
						|
  if (!round.empty()) { max_round_ = current_round_ + std::stoi(round); }
 | 
						|
 | 
						|
  return OB_SUCCESS;
 | 
						|
}
 | 
						|
 | 
						|
int ObFakeTableScanVecOp::inner_get_next_batch(const int64_t max_row_cnt)
 | 
						|
{
 | 
						|
  int ret = OB_SUCCESS;
 | 
						|
  clear_evaluated_flag();
 | 
						|
  uint64_t op_id = get_spec().get_id();
 | 
						|
  int64_t generate_random_value = 0;
 | 
						|
  bool is_duplicate = false;
 | 
						|
 | 
						|
  const ObPushdownExprSpec &pd_expr_spec =
 | 
						|
    reinterpret_cast<const ObTableScanSpec *>(&spec_)->tsc_ctdef_.scan_ctdef_.pd_expr_spec_;
 | 
						|
  for (int j = 0; j < pd_expr_spec.access_exprs_.count(); j++) {
 | 
						|
    ObExpr *expr = pd_expr_spec.access_exprs_.at(j);
 | 
						|
    OB_FAIL(fill_random_data_into_expr_datum_frame(j, pd_expr_spec.access_exprs_.count(), expr,
 | 
						|
                                                   ObTestOpConfig::get_instance().batch_size_, is_duplicate));
 | 
						|
  }
 | 
						|
 | 
						|
  // random set skip
 | 
						|
  set_random_skip(current_round_, ObTestOpConfig::get_instance().batch_size_);
 | 
						|
 | 
						|
  if (is_duplicate) { ObDataGenerator::get_instance().reset_temp_store(op_id, current_round_); }
 | 
						|
 | 
						|
  current_round_++;
 | 
						|
  brs_.size_ = ObTestOpConfig::get_instance().batch_size_;
 | 
						|
  if (current_round_ == max_round_) { brs_.end_ = true; }
 | 
						|
 | 
						|
  // print generate data
 | 
						|
  LOG_INFO("[DG] data generated by DataGenerator in ", K(current_round_ - 1));
 | 
						|
  if (op_id_2_output_streams_.count(op_id) == 0) {
 | 
						|
    std::string output_file_name = "generate_data_" + std::to_string(op_id) + ".data";
 | 
						|
    op_id_2_output_streams_[op_id].open(output_file_name.data(), std::ios::out | std::ios::trunc);
 | 
						|
  }
 | 
						|
  test::TestOpEngine::print_to_file(&brs_, this, pd_expr_spec.access_exprs_, false, &op_id_2_output_streams_[op_id]);
 | 
						|
 | 
						|
  return ret;
 | 
						|
}
 | 
						|
 | 
						|
int ObFakeTableScanVecOp::fill_random_data_into_expr_datum_frame(int expr_i, int expr_count, const ObExpr *expr,
 | 
						|
                                                                 const int output_row_count, bool &is_duplicate)
 | 
						|
{
 | 
						|
  int ret = OB_SUCCESS;
 | 
						|
  LOG_DEBUG("generate_random_value for expr:  ", K(*expr));
 | 
						|
 | 
						|
  ObIVector *i_vec = NULL;
 | 
						|
  ObDatum *datums = NULL;
 | 
						|
 | 
						|
  if (expr->enable_rich_format()) {
 | 
						|
    // vectorization 2.0 new operator
 | 
						|
    i_vec = expr->get_vector(eval_ctx_);
 | 
						|
    if (expr->is_fixed_length_data_) {
 | 
						|
      // for VEC_FIXED and VEC_UNIFORM
 | 
						|
      // we must first call init_vector because we need to use i_vec->set_xxx() which need use vector's meta data
 | 
						|
      OB_ASSERT(ObTestOpConfig::get_instance().digit_data_format_ == VEC_UNIFORM
 | 
						|
                || ObTestOpConfig::get_instance().digit_data_format_ == VEC_FIXED);
 | 
						|
      expr->init_vector(eval_ctx_, ObTestOpConfig::get_instance().digit_data_format_, output_row_count, true);
 | 
						|
    } else {
 | 
						|
      // while for VEC_DISCRETE and VEC_CONTINUOUS it doesn't matter
 | 
						|
      OB_ASSERT(ObTestOpConfig::get_instance().string_data_format_ == VEC_UNIFORM
 | 
						|
                || ObTestOpConfig::get_instance().string_data_format_ == VEC_DISCRETE
 | 
						|
                || ObTestOpConfig::get_instance().string_data_format_ == VEC_CONTINUOUS);
 | 
						|
      expr->init_vector(eval_ctx_, ObTestOpConfig::get_instance().string_data_format_, output_row_count, true);
 | 
						|
    }
 | 
						|
  } else {
 | 
						|
    // vectorization 1.0 old operator
 | 
						|
    datums = expr->locate_datums_for_update(eval_ctx_, output_row_count);
 | 
						|
  }
 | 
						|
 | 
						|
  // generate random data
 | 
						|
  get_random_data(expr_i, expr_count, expr, current_round_, output_row_count, expr->max_length_, is_duplicate);
 | 
						|
 | 
						|
  int vec_continuous_offset = 0;   // only used in VEC_CONTINUOUS
 | 
						|
  std::string vec_continuous_data; // use to store data of VEC_CONTINUOUS temporarily
 | 
						|
 | 
						|
  ObDataGenerator::TempDataStore &data_store =
 | 
						|
    ObDataGenerator::get_instance().op_2_round_2_temp_store_[get_spec().get_id()][current_round_][expr_i];
 | 
						|
  for (int row = 0; row < output_row_count; row++) {
 | 
						|
    bool is_null = data_store.null_[row];
 | 
						|
 | 
						|
    switch (expr->datum_meta_.get_type()) {
 | 
						|
    // why expr->datum_meta_.get_type() == ObInt32Type while expr->res_buf_len_ == 8 ?????
 | 
						|
    case ObInt32Type: {
 | 
						|
      int data = data_store.temp_int32_vector_[row];
 | 
						|
      if (i_vec != NULL) {
 | 
						|
        i_vec->set_int(row, static_cast<int64_t>(data));
 | 
						|
        if (is_null) { i_vec->set_null(row); }
 | 
						|
      } else {
 | 
						|
        datums[row].set_int32(data);
 | 
						|
        if (is_null) { datums[row].set_null(); }
 | 
						|
      }
 | 
						|
      break;
 | 
						|
    }
 | 
						|
    case ObIntType: {
 | 
						|
      int64_t data = data_store.temp_int64_vector_[row];
 | 
						|
      if (i_vec != NULL) {
 | 
						|
        i_vec->set_int(row, data);
 | 
						|
        if (is_null) { i_vec->set_null(row); }
 | 
						|
      } else {
 | 
						|
        datums[row].set_int(data);
 | 
						|
        if (is_null) { datums[row].set_null(); }
 | 
						|
      }
 | 
						|
      break;
 | 
						|
    }
 | 
						|
    case ObDoubleType: {
 | 
						|
      double data = data_store.temp_double_vector_[row];
 | 
						|
      if (i_vec != NULL) {
 | 
						|
        i_vec->set_double(row, data);
 | 
						|
        if (is_null) { i_vec->set_null(row); }
 | 
						|
      } else {
 | 
						|
        datums[row].set_double(data);
 | 
						|
        if (is_null) { datums[row].set_null(); }
 | 
						|
      }
 | 
						|
      break;
 | 
						|
    }
 | 
						|
    case ObNumberType: {
 | 
						|
      break;
 | 
						|
    }
 | 
						|
    case ObDecimalIntType: {
 | 
						|
      // have bug
 | 
						|
      // if decimal(20, 10)
 | 
						|
      // expr->res_buf_len_ = 16 is *not* equal to datums[x].len_ == 8
 | 
						|
      // where datums[x].len_ is set?
 | 
						|
      // i_vec->set_int(row,
 | 
						|
      //                ObDataGenerator::get_instance().round_2_temp_store_[round_].first[expr_i].temp_int64_vector_[row]);
 | 
						|
      break;
 | 
						|
    }
 | 
						|
    case ObVarcharType:
 | 
						|
    case ObCharType: {
 | 
						|
      std::string tmp_str = data_store.temp_string_vector_[row];
 | 
						|
      int str_len = static_cast<int>(tmp_str.size());
 | 
						|
 | 
						|
      if (i_vec != NULL) {
 | 
						|
        // vectorization 1.0 new operator
 | 
						|
        if (expr->get_format(eval_ctx_) == VEC_UNIFORM) {
 | 
						|
          if (is_null) {
 | 
						|
            i_vec->set_null(row);
 | 
						|
          } else {
 | 
						|
            i_vec->set_payload(row, tmp_str.data(), str_len);
 | 
						|
          }
 | 
						|
        } else {
 | 
						|
          if (is_null) { i_vec->set_null(row); }
 | 
						|
          if (expr->get_format(eval_ctx_) == VEC_DISCRETE) {
 | 
						|
            char **ptrs = expr->get_discrete_vector_ptrs(eval_ctx_);
 | 
						|
            int32_t *lens = expr->get_discrete_vector_lens(eval_ctx_);
 | 
						|
            ptrs[row] = expr->get_res_buf(eval_ctx_) + row * (expr->res_buf_len_);
 | 
						|
            lens[row] = str_len;
 | 
						|
 | 
						|
            i_vec->set_payload(row, tmp_str.data(), str_len);
 | 
						|
          } else {
 | 
						|
            // VEC_CONTINUOUS
 | 
						|
            // offset[0] == 0 which is set in cg phase
 | 
						|
            vec_continuous_offset += str_len;
 | 
						|
            uint32_t *offset = expr->get_continuous_vector_offsets(eval_ctx_);
 | 
						|
            offset[row + 1] = vec_continuous_offset;
 | 
						|
            vec_continuous_data += tmp_str; // temporarily store data here
 | 
						|
          }
 | 
						|
        }
 | 
						|
      } else {
 | 
						|
        // vectorization 1.0 old operator
 | 
						|
        if (is_null) {
 | 
						|
          datums[row].set_null();
 | 
						|
        } else {
 | 
						|
          datums[row].set_string(tmp_str.data(), str_len);
 | 
						|
        }
 | 
						|
      }
 | 
						|
 | 
						|
      break;
 | 
						|
    }
 | 
						|
    default: LOG_INFO("Can not generate random value so far for: ", K(expr->datum_meta_.get_type()));
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  if (expr->enable_rich_format() && expr->get_format(eval_ctx_) == VEC_CONTINUOUS && !vec_continuous_data.empty()) {
 | 
						|
    ObDynReserveBuf *drb = reinterpret_cast<ObDynReserveBuf *>(expr->get_continuous_vector_data(eval_ctx_));
 | 
						|
    // Todo: replace below logic when shengle handle continuous format memory
 | 
						|
    // in ObExpr::get_str_res_mem()
 | 
						|
    char *mem = NULL;
 | 
						|
    const int64_t alloc_size = next_pow2(vec_continuous_data.size());
 | 
						|
    if (OB_UNLIKELY(alloc_size > UINT32_MAX)) {
 | 
						|
      ret = OB_INVALID_ARGUMENT;
 | 
						|
      LOG_WARN("invalid argument", K(vec_continuous_data.size()), K(alloc_size), K(ret));
 | 
						|
    } else if (OB_ISNULL(mem = static_cast<char *>(eval_ctx_.alloc_expr_res(alloc_size)))) {
 | 
						|
      ret = OB_ALLOCATE_MEMORY_FAILED;
 | 
						|
      LOG_WARN("allocate memory failed", K(ret), K(ret));
 | 
						|
    } else {
 | 
						|
      // When extend memory, the old memory can not free, because the old memory may
 | 
						|
      // still be referenced. see: ob_datum_cast.cpp::common_copy_string
 | 
						|
      if (0 == drb->len_) { drb->magic_ = ObDynReserveBuf::MAGIC_NUM; }
 | 
						|
      drb->len_ = alloc_size;
 | 
						|
      drb->mem_ = mem;
 | 
						|
      MEMCPY(drb->mem_, vec_continuous_data.data(), vec_continuous_data.size());
 | 
						|
 | 
						|
      ObContinuousBase *cont_vec = static_cast<ObContinuousBase *>(expr->get_vector(eval_ctx_));
 | 
						|
      cont_vec->set_data(drb->mem_);
 | 
						|
    }
 | 
						|
    LOG_DEBUG("extend expr result memory", K(ret), K(vec_continuous_data.size()), K(alloc_size), KP(this), KP(mem));
 | 
						|
  }
 | 
						|
 | 
						|
  return ret;
 | 
						|
}
 | 
						|
 | 
						|
int ObFakeTableScanVecOp::get_random_data(int expr_i, int expr_count, const ObExpr *expr, const int round,
 | 
						|
                                          const int batch_size, const int len, bool &is_duplicate)
 | 
						|
{
 | 
						|
  int ret = OB_SUCCESS;
 | 
						|
  uint64_t op_id = get_spec().get_id();
 | 
						|
  switch (expr->datum_meta_.get_type()) {
 | 
						|
  // why expr->datum_meta_.get_type() == ObInt32Type while expr->res_buf_len_ == 8 ?????
 | 
						|
  case ObInt32Type: {
 | 
						|
    ObDataGenerator::get_instance().generate_data<int>(op_id, expr_i, expr_count, round, batch_size, len, is_duplicate);
 | 
						|
    break;
 | 
						|
  }
 | 
						|
  case ObIntType: {
 | 
						|
    ObDataGenerator::get_instance().generate_data<int64_t>(op_id, expr_i, expr_count, round, batch_size, len,
 | 
						|
                                                           is_duplicate);
 | 
						|
    break;
 | 
						|
  }
 | 
						|
  case ObDoubleType: {
 | 
						|
    ObDataGenerator::get_instance().generate_data<double>(op_id, expr_i, expr_count, round, batch_size, len,
 | 
						|
                                                          is_duplicate);
 | 
						|
    break;
 | 
						|
  }
 | 
						|
  case ObNumberType: {
 | 
						|
    break;
 | 
						|
  }
 | 
						|
  case ObDecimalIntType: {
 | 
						|
    // have bug
 | 
						|
    // if decimal(20, 10)
 | 
						|
    // expr->res_buf_len_ = 16 is *not* equal to datums[x].len_ == 8
 | 
						|
    // where datums[x].len_ is set?
 | 
						|
    ObDataGenerator::get_instance().generate_data<int64_t>(op_id, expr_i, expr_count, round, batch_size, len,
 | 
						|
                                                           is_duplicate);
 | 
						|
    break;
 | 
						|
  }
 | 
						|
  case ObVarcharType:
 | 
						|
  case ObCharType: {
 | 
						|
    ObDataGenerator::get_instance().generate_data<std::string>(op_id, expr_i, expr_count, round, batch_size, len,
 | 
						|
                                                               is_duplicate);
 | 
						|
    break;
 | 
						|
  }
 | 
						|
  default: LOG_INFO("Can not generate random value so far for: ", K(expr->datum_meta_.get_type()));
 | 
						|
  }
 | 
						|
 | 
						|
  return ret;
 | 
						|
}
 | 
						|
 | 
						|
void ObFakeTableScanVecOp::set_random_skip(const int round, const int batch_size)
 | 
						|
{
 | 
						|
  std::string generate_data_skips;
 | 
						|
  uint64_t op_id = get_spec().get_id();
 | 
						|
  if (ObDataGenerator::get_instance().op_2_round_2_skips_[op_id].count(round) == 0) {
 | 
						|
    std::uniform_int_distribution<int> u_i(0, 1);
 | 
						|
    bool if_skip;
 | 
						|
    for (int i = 0; i < batch_size; i++) {
 | 
						|
      if_skip =
 | 
						|
        ObDataGenerator::get_instance().zero_one_rand_by_probability(ObTestOpConfig::get_instance().skips_probability_);
 | 
						|
      generate_data_skips += std::to_string(!if_skip) + " ";
 | 
						|
      ObDataGenerator::get_instance().op_2_round_2_skips_[op_id][round].push_back(if_skip);
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  for (int i = 0; i < batch_size; i++) {
 | 
						|
    if (ObDataGenerator::get_instance().op_2_round_2_skips_[op_id][round][i] == true) { brs_.skip_->set(i); }
 | 
						|
  }
 | 
						|
  LOG_INFO("skips : ", K(generate_data_skips.data()));
 | 
						|
  return;
 | 
						|
}
 | 
						|
 | 
						|
} // namespace sql
 | 
						|
} // namespace oceanbase
 |