diff --git a/src/sql/engine/table/ob_odps_table_row_iter.cpp b/src/sql/engine/table/ob_odps_table_row_iter.cpp index cb472a135..2299039dd 100644 --- a/src/sql/engine/table/ob_odps_table_row_iter.cpp +++ b/src/sql/engine/table/ob_odps_table_row_iter.cpp @@ -872,7 +872,9 @@ int ObODPSTableRowIterator::fill_partition_list_data(ObExpr &expr, int64_t retur ObEvalCtx &ctx = scan_param_->op_->get_eval_ctx(); ObDatum *datums = expr.locate_batch_datums(ctx); ObObjType type = expr.obj_meta_.get_type(); - if (expr.type_ == T_PSEUDO_PARTITION_LIST_COL) { + if (OB_FAIL(expr.init_vector_for_write(ctx, VEC_UNIFORM, returned_row_cnt))) { + LOG_WARN("failed to init expr vector", K(ret), K(expr)); + } else if (expr.type_ == T_PSEUDO_PARTITION_LIST_COL) { for (int64_t row_idx = 0; OB_SUCC(ret) && row_idx < returned_row_cnt; ++row_idx) { int64_t loc_idx = expr.extra_ - 1; if (OB_UNLIKELY(loc_idx < 0 || loc_idx >= state_.part_list_val_.get_count())) { @@ -995,15 +997,6 @@ int ObODPSTableRowIterator::get_next_rows(int64_t &count, int64_t capacity) ObEvalCtx &ctx = scan_param_->op_->get_eval_ctx(); const ExprFixedArray &file_column_exprs = *(scan_param_->ext_file_column_exprs_); if (0 == column_names_.size()) { - count = std::min(capacity, state_.step_ - state_.count_); - total_count_ += count; - state_.count_ += count; - for (int64_t column_idx = 0; OB_SUCC(ret) && state_.task_idx_ != -1 && column_idx < target_column_id_list_.count(); ++column_idx) { - ObExpr &expr = *file_column_exprs.at(column_idx); - if (OB_FAIL(fill_partition_list_data(expr, count))) { - LOG_WARN("failed to fill partition list data", K(ret), K(file_column_exprs.count())); - } - } if (OB_SUCC(ret) && state_.count_ >= state_.step_ && OB_FAIL(next_task())) { @@ -1012,6 +1005,16 @@ int ObODPSTableRowIterator::get_next_rows(int64_t &count, int64_t capacity) } else if (0 != count){ ret = OB_SUCCESS; } + } else { + count = std::min(capacity, state_.step_ - state_.count_); + total_count_ += count; + state_.count_ += count; + for (int64_t column_idx = 0; OB_SUCC(ret) && column_idx < target_column_id_list_.count(); ++column_idx) { + ObExpr &expr = *file_column_exprs.at(column_idx); + if (OB_FAIL(fill_partition_list_data(expr, count))) { + LOG_WARN("failed to fill partition list data", K(ret), K(file_column_exprs.count())); + } + } } } else if (state_.count_ >= state_.step_ && OB_FAIL(next_task())) { if (OB_ITER_END != ret) { @@ -1019,6 +1022,7 @@ int ObODPSTableRowIterator::get_next_rows(int64_t &count, int64_t capacity) } else { LOG_TRACE("get next task end", K(ret), K(state_)); } + count = 0; } else { int64_t returned_row_cnt = 0; try { @@ -1072,17 +1076,21 @@ int ObODPSTableRowIterator::get_next_rows(int64_t &count, int64_t capacity) } else if (0 == returned_row_cnt) { // do nothing LOG_TRACE("expected result: already retried reading task successfully", K(total_count_), K(returned_row_cnt), K(state_), K(ret)); + count = 0; } else { int64_t data_idx = 0; for (int64_t column_idx = 0; OB_SUCC(ret) && column_idx < target_column_id_list_.count(); ++column_idx) { uint32_t target_idx = target_column_id_list_.at(column_idx); ObExpr &expr = *file_column_exprs.at(column_idx); - ObDatum *datums = expr.locate_batch_datums(ctx); + ObObjType type = expr.obj_meta_.get_type(); + ObDatum *datums = expr.locate_batch_datums(ctx); if (expr.type_ == T_PSEUDO_PARTITION_LIST_COL) { if (OB_FAIL(fill_partition_list_data(expr, returned_row_cnt))) { LOG_WARN("failed to fill partition list data", K(ret)); } + } else if (OB_FAIL(expr.init_vector_for_write(ctx, VEC_UNIFORM, returned_row_cnt))) { + LOG_WARN("failed to init expr vector", K(ret), K(expr)); } else { apsara::odps::sdk::ODPSColumnType odps_type = column_list_.at(target_idx).type_info_.mType; target_idx = data_idx++; @@ -1645,7 +1653,7 @@ int ObODPSTableRowIterator::get_next_rows(int64_t &count, int64_t capacity) } } } - OZ(calc_exprs_for_rowid(count)); + if (OB_SUCC(ret)) { ObEvalCtx::BatchInfoScopeGuard batch_info_guard(ctx); batch_info_guard.set_batch_idx(0); @@ -1661,8 +1669,10 @@ int ObODPSTableRowIterator::get_next_rows(int64_t &count, int64_t capacity) column_convert_expr->locate_batch_datums(ctx), sizeof(ObDatum) * count); column_expr->set_evaluated_flag(ctx); } + OZ(column_expr->init_vector(ctx, VEC_UNIFORM, count)); } } + OZ(calc_exprs_for_rowid(count)); return ret; } diff --git a/src/sql/resolver/dml/ob_dml_resolver.cpp b/src/sql/resolver/dml/ob_dml_resolver.cpp index a1721047b..762b31c4b 100755 --- a/src/sql/resolver/dml/ob_dml_resolver.cpp +++ b/src/sql/resolver/dml/ob_dml_resolver.cpp @@ -14419,9 +14419,12 @@ int ObDMLResolver::check_insert_into_select_use_fast_column_convert(const ObColu } else if (source_base_table_schema->is_external_table()) { ObArenaAllocator alloc; ObExternalFileFormat format; - if (OB_FAIL(format.load_from_string(source_base_table_schema->get_external_file_format(), alloc))) { - LOG_WARN("load from string failed", K(ret)); - } else if (format.format_type_ == ObExternalFileFormat::CSV_FORMAT) { + const ObString &format_or_properties = source_base_table_schema->get_external_file_format().empty() ? + source_base_table_schema->get_external_properties() : + source_base_table_schema->get_external_file_format(); + if (OB_FAIL(format.load_from_string(format_or_properties, alloc))) { + LOG_WARN("load from string failed", K(ret), K(format_or_properties.length()), K(format_or_properties)); + } else if (format.format_type_ == ObExternalFileFormat::CSV_FORMAT || format.format_type_ == ObExternalFileFormat::ODPS_FORMAT) { fast_calc = true; } }