diff --git a/src/sql/engine/table/ob_parquet_table_row_iter.cpp b/src/sql/engine/table/ob_parquet_table_row_iter.cpp index 8206fe5b3..14aa40f47 100644 --- a/src/sql/engine/table/ob_parquet_table_row_iter.cpp +++ b/src/sql/engine/table/ob_parquet_table_row_iter.cpp @@ -203,6 +203,7 @@ int ObParquetTableRowIterator::init(const storage::ObTableScanParam *scan_param) ObEvalCtx &eval_ctx = scan_param->op_->get_eval_ctx(); mem_attr_ = ObMemAttr(MTL_ID(), "ParquetRowIter"); allocator_.set_attr(mem_attr_); + str_res_mem_.set_attr(mem_attr_); arrow_alloc_.init(MTL_ID()); OZ (ObExternalTableRowIterator::init(scan_param)); OZ (data_access_driver_.init(scan_param->external_file_location_, @@ -595,13 +596,13 @@ int ObParquetTableRowIterator::DataLoader::load_int32_to_int32_vec() values.get_data(), &values_cnt); int j = 0; if (IS_PARQUET_COL_NOT_NULL && values_cnt == row_count_) { - MEMCPY(pointer_cast(dec_vec->get_data()), values.get_data(), sizeof(int32_t) * row_count_); + MEMCPY(pointer_cast(dec_vec->get_data()) + row_offset_, values.get_data(), sizeof(int32_t) * row_count_); } else { for (int i = 0; OB_SUCC(ret) && i < row_count_; i++) { if (IS_PARQUET_COL_VALUE_IS_NULL(def_levels_buf_.at(i))) { - file_col_expr_->get_vector(eval_ctx_)->set_null(i); + dec_vec->set_null(i + row_offset_); } else { - dec_vec->set_int32(i, values.at(j++)); + dec_vec->set_int32(i + row_offset_, values.at(j++)); } } } @@ -771,9 +772,9 @@ int ObParquetTableRowIterator::DataLoader::load_decimal_any_col() int j = 0; for (int i = 0; OB_SUCC(ret) && i < row_count_; i++) { if (IS_PARQUET_COL_VALUE_IS_NULL(def_levels_buf_.at(i))) { - file_col_expr_->get_vector(eval_ctx_)->set_null(i); + file_col_expr_->get_vector(eval_ctx_)->set_null(i + row_offset_); } else { - OZ (to_numeric(i, values.at(j++))); + OZ (to_numeric(i + row_offset_, values.at(j++))); } } } else if (reader_->descr()->physical_type() == parquet::Type::type::INT64) { @@ -785,9 +786,9 @@ int ObParquetTableRowIterator::DataLoader::load_decimal_any_col() int j = 0; for (int i = 0; OB_SUCC(ret) && i < row_count_; i++) { if (IS_PARQUET_COL_VALUE_IS_NULL(def_levels_buf_.at(i))) { - file_col_expr_->get_vector(eval_ctx_)->set_null(i); + file_col_expr_->get_vector(eval_ctx_)->set_null(i + row_offset_); } else { - OZ (to_numeric(i, values.at(j++))); + OZ (to_numeric(i + row_offset_, values.at(j++))); } } } else if (reader_->descr()->physical_type() == parquet::Type::Type::FIXED_LEN_BYTE_ARRAY) { @@ -803,10 +804,10 @@ int ObParquetTableRowIterator::DataLoader::load_decimal_any_col() int j = 0; for (int i = 0; OB_SUCC(ret) && i < row_count_; i++) { if (IS_PARQUET_COL_VALUE_IS_NULL(def_levels_buf_.at(i))) { - file_col_expr_->get_vector(eval_ctx_)->set_null(i); + file_col_expr_->get_vector(eval_ctx_)->set_null(i + row_offset_); } else { parquet::FixedLenByteArray &cur_v = values.at(j++); - OZ (to_numeric_hive(i, pointer_cast(cur_v.ptr), fixed_length, buffer.get_data(), buffer.count())); + OZ (to_numeric_hive(i + row_offset_, pointer_cast(cur_v.ptr), fixed_length, buffer.get_data(), buffer.count())); //OZ (to_numeric(i, pointer_cast(cur_v.ptr), fixed_length)); } } @@ -822,10 +823,10 @@ int ObParquetTableRowIterator::DataLoader::load_decimal_any_col() int j = 0; for (int i = 0; OB_SUCC(ret) && i < row_count_; i++) { if (IS_PARQUET_COL_VALUE_IS_NULL(def_levels_buf_.at(i))) { - file_col_expr_->get_vector(eval_ctx_)->set_null(i); + file_col_expr_->get_vector(eval_ctx_)->set_null(i + row_offset_); } else { parquet::ByteArray &cur_v = values.at(j++); - OZ (to_numeric_hive(i, pointer_cast(cur_v.ptr), cur_v.len, buffer.get_data(), buffer.count())); + OZ (to_numeric_hive(i + row_offset_, pointer_cast(cur_v.ptr), cur_v.len, buffer.get_data(), buffer.count())); //OZ (to_numeric(i, pointer_cast(cur_v.ptr), cur_v.len)); } } @@ -859,16 +860,30 @@ int ObParquetTableRowIterator::DataLoader::load_fixed_string_col() int j = 0; for (int i = 0; i < row_count_; i++) { if (IS_PARQUET_COL_VALUE_IS_NULL(def_levels_buf_.at(i))) { - text_vec->set_null(i); + text_vec->set_null(i + row_offset_); } else { + void *res_ptr = NULL; parquet::FixedLenByteArray &cur_v = values.at(j++); - text_vec->set_string(i, pointer_cast(cur_v.ptr), fixed_length); if (OB_UNLIKELY(fixed_length > file_col_expr_->max_length_ && (is_byte_length || ObCharset::strlen_char(CS_TYPE_UTF8MB4_BIN, pointer_cast(cur_v.ptr), fixed_length) > file_col_expr_->max_length_))) { ret = OB_ERR_DATA_TOO_LONG; LOG_WARN("data too long", K(ret)); + } else { + if (row_count_ == batch_size_) { + res_ptr = (void*)(cur_v.ptr); + } else { + //when row_count_ less than batch_size_, it may reach page end and reload next page + //string values need deep copy + if (OB_ISNULL(res_ptr = str_res_mem_.alloc(fixed_length))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("fail to allocate memory", K(fixed_length)); + } else { + MEMCPY(res_ptr, cur_v.ptr, fixed_length); + } + } + text_vec->set_string(i + row_offset_, pointer_cast(res_ptr), fixed_length); } } } @@ -902,19 +917,33 @@ int ObParquetTableRowIterator::DataLoader::load_string_col() int j = 0; for (int i = 0; i < row_count_; i++) { if (IS_PARQUET_COL_VALUE_IS_NULL(def_levels_buf_.at(i))) { - text_vec->set_null(i); + text_vec->set_null(i + row_offset_); } else { parquet::ByteArray &cur_v = values.at(j++); if (is_oracle_mode && 0 == cur_v.len) { - text_vec->set_null(i); + text_vec->set_null(i + row_offset_); } else { - text_vec->set_string(i, pointer_cast(cur_v.ptr), cur_v.len); + void *res_ptr = NULL; if (OB_UNLIKELY(cur_v.len > file_col_expr_->max_length_ && (is_byte_length || ObCharset::strlen_char(CS_TYPE_UTF8MB4_BIN, pointer_cast(cur_v.ptr), cur_v.len) > file_col_expr_->max_length_))) { ret = OB_ERR_DATA_TOO_LONG; LOG_WARN("data too long", K(ret)); + } else { + if (row_count_ == batch_size_) { + res_ptr = (void *)(cur_v.ptr); + } else { + //when row_count_ less than batch_size_, it may reach page end and reload next page + //string values need deep copy + if (OB_ISNULL(res_ptr = str_res_mem_.alloc(cur_v.len))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("fail to allocate memory", K(cur_v.len)); + } else { + MEMCPY(res_ptr, cur_v.ptr, cur_v.len); + } + } + text_vec->set_string(i + row_offset_, pointer_cast(res_ptr), cur_v.len); } } } @@ -946,9 +975,9 @@ int ObParquetTableRowIterator::DataLoader::load_int32_to_int64_vec() int j = 0; for (int i = 0; i < row_count_; i++) { if (IS_PARQUET_COL_VALUE_IS_NULL(def_levels_buf_.at(i))) { - int32_vec->set_null(i); + int32_vec->set_null(i + row_offset_); } else { - int32_vec->set_int(i, values.at(j++)); + int32_vec->set_int(i + row_offset_, values.at(j++)); } } } @@ -975,14 +1004,14 @@ int ObParquetTableRowIterator::DataLoader::load_int64_to_int64_vec() ret = OB_NOT_SUPPORTED; LOG_WARN("repeated data not support"); } else if (IS_PARQUET_COL_NOT_NULL && values_cnt == row_count_) { - MEMCPY(pointer_cast(int64_vec->get_data()), values.get_data(), sizeof(int64_t) * row_count_); + MEMCPY(pointer_cast(int64_vec->get_data()) + row_count_, values.get_data(), sizeof(int64_t) * row_count_); } else { int j = 0; for (int i = 0; i < row_count_; i++) { if (IS_PARQUET_COL_VALUE_IS_NULL(def_levels_buf_.at(i))) { - int64_vec->set_null(i); + int64_vec->set_null(i + row_offset_); } else { - int64_vec->set_int(i, values.at(j++)); + int64_vec->set_int(i + row_offset_, values.at(j++)); } } } @@ -1007,9 +1036,9 @@ int ObParquetTableRowIterator::DataLoader::load_date_col_to_datetime() int j = 0; for (int i = 0; OB_SUCC(ret) && i < row_count_; i++) { if (IS_PARQUET_COL_VALUE_IS_NULL(def_levels_buf_.at(i))) { - file_col_expr_->get_vector(eval_ctx_)->set_null(i); + dec_vec->set_null(i + row_offset_); } else { - dec_vec->set_datetime(i, values.at(j++) * USECS_PER_DAY); + dec_vec->set_datetime(i + row_offset_, values.at(j++) * USECS_PER_DAY); } } } @@ -1032,9 +1061,9 @@ int ObParquetTableRowIterator::DataLoader::load_time_millis_col() int j = 0; for (int i = 0; OB_SUCC(ret) && i < row_count_; i++) { if (IS_PARQUET_COL_VALUE_IS_NULL(def_levels_buf_.at(i))) { - file_col_expr_->get_vector(eval_ctx_)->set_null(i); + dec_vec->set_null(i + row_offset_); } else { - dec_vec->set_time(i, values.at(j++) * USECS_PER_MSEC); + dec_vec->set_time(i + row_offset_, values.at(j++) * USECS_PER_MSEC); } } } @@ -1057,9 +1086,9 @@ int ObParquetTableRowIterator::DataLoader::load_time_nanos_col() int j = 0; for (int i = 0; OB_SUCC(ret) && i < row_count_; i++) { if (IS_PARQUET_COL_VALUE_IS_NULL(def_levels_buf_.at(i))) { - file_col_expr_->get_vector(eval_ctx_)->set_null(i); + dec_vec->set_null(i + row_offset_); } else { - dec_vec->set_time(i, values.at(j++) / NSECS_PER_USEC); + dec_vec->set_time(i + row_offset_, values.at(j++) / NSECS_PER_USEC); } } } @@ -1111,15 +1140,15 @@ int ObParquetTableRowIterator::DataLoader::load_timestamp_millis_col() int j = 0; for (int i = 0; OB_SUCC(ret) && i < row_count_; i++) { if (IS_PARQUET_COL_VALUE_IS_NULL(def_levels_buf_.at(i))) { - file_col_expr_->get_vector(eval_ctx_)->set_null(i); + dec_vec->set_null(i + row_offset_); } else { int64_t adjusted_value = values.at(j++) * USECS_PER_MSEC + adjust_us; if (ObTimestampType == file_col_expr_->datum_meta_.type_) { - dec_vec->set_timestamp(i, adjusted_value); + dec_vec->set_timestamp(i + row_offset_, adjusted_value); } else { ObOTimestampData data; data.time_us_ = adjusted_value; - dec_vec->set_otimestamp_tiny(i, ObOTimestampTinyData().from_timestamp_data(data)); + dec_vec->set_otimestamp_tiny(i + row_offset_, ObOTimestampTinyData().from_timestamp_data(data)); } } } @@ -1144,15 +1173,15 @@ int ObParquetTableRowIterator::DataLoader::load_timestamp_micros_col() int j = 0; for (int i = 0; OB_SUCC(ret) && i < row_count_; i++) { if (IS_PARQUET_COL_VALUE_IS_NULL(def_levels_buf_.at(i))) { - file_col_expr_->get_vector(eval_ctx_)->set_null(i); + dec_vec->set_null(i + row_offset_); } else { int64_t adjusted_value = (values.at(j++) + adjust_us); if (ObTimestampType == file_col_expr_->datum_meta_.type_) { - dec_vec->set_timestamp(i, adjusted_value); + dec_vec->set_timestamp(i + row_offset_, adjusted_value); } else { ObOTimestampData data; data.time_us_ = adjusted_value; - dec_vec->set_otimestamp_tiny(i, ObOTimestampTinyData().from_timestamp_data(data)); + dec_vec->set_otimestamp_tiny(i + row_offset_, ObOTimestampTinyData().from_timestamp_data(data)); } } } @@ -1177,16 +1206,16 @@ int ObParquetTableRowIterator::DataLoader::load_timestamp_nanos_col() int j = 0; for (int i = 0; OB_SUCC(ret) && i < row_count_; i++) { if (IS_PARQUET_COL_VALUE_IS_NULL(def_levels_buf_.at(i))) { - file_col_expr_->get_vector(eval_ctx_)->set_null(i); + dec_vec->set_null(i + row_offset_); } else { if (ObTimestampType == file_col_expr_->datum_meta_.type_) { - dec_vec->set_timestamp(i, values.at(j++) / NSECS_PER_USEC + adjust_us); + dec_vec->set_timestamp(i + row_offset_, values.at(j++) / NSECS_PER_USEC + adjust_us); } else { ObOTimestampData data; int64_t cur_value = values.at(j++); data.time_us_ = cur_value / NSECS_PER_USEC + adjust_us; data.time_ctx_.set_tail_nsec(cur_value % NSECS_PER_USEC); - dec_vec->set_otimestamp_tiny(i, ObOTimestampTinyData().from_timestamp_data(data)); + dec_vec->set_otimestamp_tiny(i + row_offset_, ObOTimestampTinyData().from_timestamp_data(data)); } } } @@ -1211,19 +1240,19 @@ int ObParquetTableRowIterator::DataLoader::load_timestamp_hive() int j = 0; for (int i = 0; OB_SUCC(ret) && i < row_count_; i++) { if (IS_PARQUET_COL_VALUE_IS_NULL(def_levels_buf_.at(i))) { - file_col_expr_->get_vector(eval_ctx_)->set_null(i); + dec_vec->set_null(i + row_offset_); } else { parquet::Int96 &value = values.at(j++); uint64_t nsec_time_value = ((uint64_t)value.value[1] << 32) + (uint64_t)value.value[0]; uint32_t julian_date_value = value.value[2]; int64_t utc_timestamp =((int64_t)julian_date_value - 2440588LL) * 86400000000LL + (int64_t)(nsec_time_value / NSECS_PER_USEC); if (ObTimestampType == file_col_expr_->datum_meta_.type_) { - dec_vec->set_timestamp(i, utc_timestamp + adjust_us); + dec_vec->set_timestamp(i + row_offset_, utc_timestamp + adjust_us); } else { ObOTimestampData data; data.time_us_ = utc_timestamp + adjust_us; data.time_ctx_.set_tail_nsec((int32_t)(nsec_time_value % NSECS_PER_USEC)); - dec_vec->set_otimestamp_tiny(i, ObOTimestampTinyData().from_timestamp_data(data)); + dec_vec->set_otimestamp_tiny(i + row_offset_, ObOTimestampTinyData().from_timestamp_data(data)); } } } @@ -1250,14 +1279,14 @@ int ObParquetTableRowIterator::DataLoader::load_float() ret = OB_NOT_SUPPORTED; LOG_WARN("repeated data not support"); } else if (IS_PARQUET_COL_NOT_NULL && values_cnt == row_count_) { - MEMCPY(pointer_cast(float_vec->get_data()), values.get_data(), sizeof(float) * row_count_); + MEMCPY(pointer_cast(float_vec->get_data()) + row_offset_, values.get_data(), sizeof(float) * row_count_); } else { int j = 0; for (int i = 0; i < row_count_; i++) { if (IS_PARQUET_COL_VALUE_IS_NULL(def_levels_buf_.at(i))) { - float_vec->set_null(i); + float_vec->set_null(i + row_offset_); } else { - float_vec->set_float(i, values.at(j++)); + float_vec->set_float(i + row_offset_, values.at(j++)); } } } @@ -1284,14 +1313,14 @@ int ObParquetTableRowIterator::DataLoader::load_double() ret = OB_NOT_SUPPORTED; LOG_WARN("repeated data not support"); } else if (IS_PARQUET_COL_NOT_NULL && values_cnt == row_count_) { - MEMCPY(pointer_cast(double_vec->get_data()), values.get_data(), sizeof(double) * row_count_); + MEMCPY(pointer_cast(double_vec->get_data()) + row_offset_, values.get_data(), sizeof(double) * row_count_); } else { int j = 0; for (int i = 0; i < row_count_; i++) { if (IS_PARQUET_COL_VALUE_IS_NULL(def_levels_buf_.at(i))) { - double_vec->set_null(i); + double_vec->set_null(i + row_offset_); } else { - double_vec->set_double(i, values.at(j++)); + double_vec->set_double(i + row_offset_, values.at(j++)); } } } @@ -1321,23 +1350,33 @@ int ObParquetTableRowIterator::get_next_rows(int64_t &count, int64_t capacity) if (!file_column_exprs_.count()) { read_count = std::min(capacity, state_.cur_row_group_row_count_ - state_.cur_row_group_read_row_count_); } else { + str_res_mem_.reuse(); try { //load vec data from parquet file to file column expr for (int i = 0; OB_SUCC(ret) && i < file_column_exprs_.count(); ++i) { - if (OB_UNLIKELY(!column_readers_.at(i).get()->HasNext())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("page end unexpected", K(ret)); - } - if (OB_SUCC(ret)) { + int64_t load_row_count = 0; + OZ (file_column_exprs_.at(i)->init_vector_for_write( + eval_ctx, file_column_exprs_.at(i)->get_default_res_format(), eval_ctx.max_batch_size_)); + while (OB_SUCC(ret) && load_row_count < capacity && column_readers_.at(i).get()->HasNext()) { + int64_t temp_row_count = 0; DataLoader loader(eval_ctx, file_column_exprs_.at(i), column_readers_.at(i).get(), - def_levels_buf_, rep_levels_buf_, capacity, read_count); + def_levels_buf_, rep_levels_buf_, str_res_mem_, + capacity - load_row_count, load_row_count, temp_row_count); MEMSET(def_levels_buf_.get_data(), 0, sizeof(def_levels_buf_.at(0)) * eval_ctx.max_batch_size_); MEMSET(rep_levels_buf_.get_data(), 0, sizeof(rep_levels_buf_.at(0)) * eval_ctx.max_batch_size_); - OZ (file_column_exprs_.at(i)->init_vector_for_write( - eval_ctx, file_column_exprs_.at(i)->get_default_res_format(), eval_ctx.max_batch_size_)); OZ (loader.load_data_for_col(load_funcs_.at(i))); - file_column_exprs_.at(i)->set_evaluated_projected(eval_ctx); + load_row_count += temp_row_count; } + if (OB_SUCC(ret)) { + if (0 == read_count) { + read_count = load_row_count; + } else { + if (read_count != load_row_count) { + ret = OB_ERR_UNEXPECTED; + } + } + } + file_column_exprs_.at(i)->set_evaluated_projected(eval_ctx); } } catch(const std::exception& e) { ret = OB_ERR_UNEXPECTED; diff --git a/src/sql/engine/table/ob_parquet_table_row_iter.h b/src/sql/engine/table/ob_parquet_table_row_iter.h index 6f6686220..4e0de58ae 100644 --- a/src/sql/engine/table/ob_parquet_table_row_iter.h +++ b/src/sql/engine/table/ob_parquet_table_row_iter.h @@ -156,15 +156,19 @@ private: parquet::ColumnReader *reader, common::ObIArrayWrap &def_levels_buf, common::ObIArrayWrap &rep_levels_buf, + common::ObIAllocator &str_res_mem, const int64_t batch_size, + const int64_t row_offset, int64_t &row_count): eval_ctx_(eval_ctx), file_col_expr_(file_col_expr), reader_(reader), batch_size_(batch_size), + row_offset_(row_offset), row_count_(row_count), def_levels_buf_(def_levels_buf), - rep_levels_buf_(rep_levels_buf) + rep_levels_buf_(rep_levels_buf), + str_res_mem_(str_res_mem) {} typedef int (DataLoader::*LOAD_FUNC)(); static LOAD_FUNC select_load_function(const ObDatumMeta &datum_type, @@ -202,9 +206,11 @@ private: ObExpr *file_col_expr_; parquet::ColumnReader *reader_; const int64_t batch_size_; + const int64_t row_offset_; int64_t &row_count_; common::ObIArrayWrap &def_levels_buf_; common::ObIArrayWrap &rep_levels_buf_; + common::ObIAllocator &str_res_mem_; }; private: int next_file(); @@ -215,6 +221,7 @@ private: StateValues state_; lib::ObMemAttr mem_attr_; ObArenaAllocator allocator_; + ObArenaAllocator str_res_mem_; ObArrowMemPool arrow_alloc_; parquet::ReaderProperties read_props_; ObExternalDataAccessDriver data_access_driver_;