修复odps cpp中的每轮添加计数归零和适配cpp适配向量化2.0

This commit is contained in:
qingzhu521 2024-12-24 06:46:04 +00:00 committed by ob-robot
parent e23cf4a02a
commit ceebb92d04
2 changed files with 28 additions and 15 deletions

View File

@ -872,7 +872,9 @@ int ObODPSTableRowIterator::fill_partition_list_data(ObExpr &expr, int64_t retur
ObEvalCtx &ctx = scan_param_->op_->get_eval_ctx();
ObDatum *datums = expr.locate_batch_datums(ctx);
ObObjType type = expr.obj_meta_.get_type();
if (expr.type_ == T_PSEUDO_PARTITION_LIST_COL) {
if (OB_FAIL(expr.init_vector_for_write(ctx, VEC_UNIFORM, returned_row_cnt))) {
LOG_WARN("failed to init expr vector", K(ret), K(expr));
} else if (expr.type_ == T_PSEUDO_PARTITION_LIST_COL) {
for (int64_t row_idx = 0; OB_SUCC(ret) && row_idx < returned_row_cnt; ++row_idx) {
int64_t loc_idx = expr.extra_ - 1;
if (OB_UNLIKELY(loc_idx < 0 || loc_idx >= state_.part_list_val_.get_count())) {
@ -995,15 +997,6 @@ int ObODPSTableRowIterator::get_next_rows(int64_t &count, int64_t capacity)
ObEvalCtx &ctx = scan_param_->op_->get_eval_ctx();
const ExprFixedArray &file_column_exprs = *(scan_param_->ext_file_column_exprs_);
if (0 == column_names_.size()) {
count = std::min(capacity, state_.step_ - state_.count_);
total_count_ += count;
state_.count_ += count;
for (int64_t column_idx = 0; OB_SUCC(ret) && state_.task_idx_ != -1 && column_idx < target_column_id_list_.count(); ++column_idx) {
ObExpr &expr = *file_column_exprs.at(column_idx);
if (OB_FAIL(fill_partition_list_data(expr, count))) {
LOG_WARN("failed to fill partition list data", K(ret), K(file_column_exprs.count()));
}
}
if (OB_SUCC(ret) &&
state_.count_ >= state_.step_ &&
OB_FAIL(next_task())) {
@ -1012,6 +1005,16 @@ int ObODPSTableRowIterator::get_next_rows(int64_t &count, int64_t capacity)
} else if (0 != count){
ret = OB_SUCCESS;
}
} else {
count = std::min(capacity, state_.step_ - state_.count_);
total_count_ += count;
state_.count_ += count;
for (int64_t column_idx = 0; OB_SUCC(ret) && column_idx < target_column_id_list_.count(); ++column_idx) {
ObExpr &expr = *file_column_exprs.at(column_idx);
if (OB_FAIL(fill_partition_list_data(expr, count))) {
LOG_WARN("failed to fill partition list data", K(ret), K(file_column_exprs.count()));
}
}
}
} else if (state_.count_ >= state_.step_ && OB_FAIL(next_task())) {
if (OB_ITER_END != ret) {
@ -1019,6 +1022,7 @@ int ObODPSTableRowIterator::get_next_rows(int64_t &count, int64_t capacity)
} else {
LOG_TRACE("get next task end", K(ret), K(state_));
}
count = 0;
} else {
int64_t returned_row_cnt = 0;
try {
@ -1072,17 +1076,21 @@ int ObODPSTableRowIterator::get_next_rows(int64_t &count, int64_t capacity)
} else if (0 == returned_row_cnt) {
// do nothing
LOG_TRACE("expected result: already retried reading task successfully", K(total_count_), K(returned_row_cnt), K(state_), K(ret));
count = 0;
} else {
int64_t data_idx = 0;
for (int64_t column_idx = 0; OB_SUCC(ret) && column_idx < target_column_id_list_.count(); ++column_idx) {
uint32_t target_idx = target_column_id_list_.at(column_idx);
ObExpr &expr = *file_column_exprs.at(column_idx);
ObDatum *datums = expr.locate_batch_datums(ctx);
ObObjType type = expr.obj_meta_.get_type();
ObDatum *datums = expr.locate_batch_datums(ctx);
if (expr.type_ == T_PSEUDO_PARTITION_LIST_COL) {
if (OB_FAIL(fill_partition_list_data(expr, returned_row_cnt))) {
LOG_WARN("failed to fill partition list data", K(ret));
}
} else if (OB_FAIL(expr.init_vector_for_write(ctx, VEC_UNIFORM, returned_row_cnt))) {
LOG_WARN("failed to init expr vector", K(ret), K(expr));
} else {
apsara::odps::sdk::ODPSColumnType odps_type = column_list_.at(target_idx).type_info_.mType;
target_idx = data_idx++;
@ -1645,7 +1653,7 @@ int ObODPSTableRowIterator::get_next_rows(int64_t &count, int64_t capacity)
}
}
}
OZ(calc_exprs_for_rowid(count));
if (OB_SUCC(ret)) {
ObEvalCtx::BatchInfoScopeGuard batch_info_guard(ctx);
batch_info_guard.set_batch_idx(0);
@ -1661,8 +1669,10 @@ int ObODPSTableRowIterator::get_next_rows(int64_t &count, int64_t capacity)
column_convert_expr->locate_batch_datums(ctx), sizeof(ObDatum) * count);
column_expr->set_evaluated_flag(ctx);
}
OZ(column_expr->init_vector(ctx, VEC_UNIFORM, count));
}
}
OZ(calc_exprs_for_rowid(count));
return ret;
}

View File

@ -14419,9 +14419,12 @@ int ObDMLResolver::check_insert_into_select_use_fast_column_convert(const ObColu
} else if (source_base_table_schema->is_external_table()) {
ObArenaAllocator alloc;
ObExternalFileFormat format;
if (OB_FAIL(format.load_from_string(source_base_table_schema->get_external_file_format(), alloc))) {
LOG_WARN("load from string failed", K(ret));
} else if (format.format_type_ == ObExternalFileFormat::CSV_FORMAT) {
const ObString &format_or_properties = source_base_table_schema->get_external_file_format().empty() ?
source_base_table_schema->get_external_properties() :
source_base_table_schema->get_external_file_format();
if (OB_FAIL(format.load_from_string(format_or_properties, alloc))) {
LOG_WARN("load from string failed", K(ret), K(format_or_properties.length()), K(format_or_properties));
} else if (format.format_type_ == ObExternalFileFormat::CSV_FORMAT || format.format_type_ == ObExternalFileFormat::ODPS_FORMAT) {
fast_calc = true;
}
}