[Bug](ODBC) fix vectorized null value error report in odbc scan node (#11420)
* [Bug](ODBC) fix vectorized null value error report in odbc scan node Co-authored-by: lihaopeng <lihaopeng@baidu.com>
This commit is contained in:
@ -201,8 +201,10 @@ Status OdbcScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eo
|
||||
slot_desc->col_name());
|
||||
}
|
||||
} else if (column_data.strlen_or_ind > column_data.buffer_length) {
|
||||
return Status::InternalError("nonnull column contains nullptr. table={}, column={}",
|
||||
_table_name, slot_desc->col_name());
|
||||
return Status::InternalError(
|
||||
"column value length longer than buffer length. "
|
||||
"table={}, column={}, buffer_length",
|
||||
_table_name, slot_desc->col_name(), column_data.buffer_length);
|
||||
} else {
|
||||
RETURN_IF_ERROR(write_text_slot(static_cast<char*>(column_data.target_value_ptr),
|
||||
column_data.strlen_or_ind, slot_desc, state));
|
||||
|
||||
@ -310,15 +310,14 @@ inline bool TextConverter::write_vec_column(const SlotDescriptor* slot_desc,
|
||||
break;
|
||||
}
|
||||
|
||||
if (parse_result == StringParser::PARSE_FAILURE) {
|
||||
if (UNLIKELY(parse_result == StringParser::PARSE_FAILURE)) {
|
||||
if (true == slot_desc->is_nullable()) {
|
||||
auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(col_ptr);
|
||||
auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(nullable_col_ptr);
|
||||
size_t size = nullable_column->get_null_map_data().size();
|
||||
doris::vectorized::NullMap& null_map_data = nullable_column->get_null_map_data();
|
||||
null_map_data[size - 1] = 1;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -107,19 +107,6 @@ Status VOdbcScanNode::open(RuntimeState* state) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status VOdbcScanNode::write_text_slot(char* value, int value_length, SlotDescriptor* slot,
|
||||
RuntimeState* state) {
|
||||
if (!_text_converter->write_slot(slot, _tuple, value, value_length, true, false,
|
||||
_tuple_pool.get())) {
|
||||
std::stringstream ss;
|
||||
ss << "Fail to convert odbc value:'" << value << "' to " << slot->type() << " on column:`"
|
||||
<< slot->col_name() + "`";
|
||||
return Status::InternalError(ss.str());
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status VOdbcScanNode::get_next(RuntimeState* state, Block* block, bool* eos) {
|
||||
INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "VOdbcScanNode::get_next");
|
||||
VLOG_CRITICAL << get_scan_node_type() << "::GetNext";
|
||||
@ -173,7 +160,6 @@ Status VOdbcScanNode::get_next(RuntimeState* state, Block* block, bool* eos) {
|
||||
}
|
||||
|
||||
// Read one row from reader
|
||||
|
||||
for (int column_index = 0, materialized_column_index = 0; column_index < column_size;
|
||||
++column_index) {
|
||||
auto slot_desc = tuple_desc->slots()[column_index];
|
||||
@ -186,12 +172,27 @@ Status VOdbcScanNode::get_next(RuntimeState* state, Block* block, bool* eos) {
|
||||
char* value_data = static_cast<char*>(column_data.target_value_ptr);
|
||||
int value_len = column_data.strlen_or_ind;
|
||||
|
||||
if (!text_converter->write_column(slot_desc, &columns[column_index], value_data,
|
||||
value_len, true, false)) {
|
||||
std::stringstream ss;
|
||||
ss << "Fail to convert odbc value:'" << value_data << "' to "
|
||||
<< slot_desc->type() << " on column:`" << slot_desc->col_name() + "`";
|
||||
return Status::InternalError(ss.str());
|
||||
if (value_len == SQL_NULL_DATA) {
|
||||
if (slot_desc->is_nullable()) {
|
||||
columns[column_index]->insert_default();
|
||||
} else {
|
||||
return Status::InternalError(
|
||||
"nonnull column contains nullptr. table={}, column={}", _table_name,
|
||||
slot_desc->col_name());
|
||||
}
|
||||
} else if (value_len > column_data.buffer_length) {
|
||||
return Status::InternalError(
|
||||
"column value length longer than buffer length. "
|
||||
"table={}, column={}, buffer_length",
|
||||
_table_name, slot_desc->col_name(), column_data.buffer_length);
|
||||
} else {
|
||||
if (!text_converter->write_column(slot_desc, &columns[column_index], value_data,
|
||||
value_len, true, false)) {
|
||||
std::stringstream ss;
|
||||
ss << "Fail to convert odbc value:'" << value_data << "' to "
|
||||
<< slot_desc->type() << " on column:`" << slot_desc->col_name() + "`";
|
||||
return Status::InternalError(ss.str());
|
||||
}
|
||||
}
|
||||
materialized_column_index++;
|
||||
}
|
||||
|
||||
@ -57,11 +57,6 @@ protected:
|
||||
virtual void debug_string(int indentation_level, std::stringstream* out) const override;
|
||||
|
||||
private:
|
||||
// Writes a slot in tuple from an MySQL value containing text data.
|
||||
// The Odbc value is converted into the appropriate target type.
|
||||
Status write_text_slot(char* value, int value_length, SlotDescriptor* slot,
|
||||
RuntimeState* state);
|
||||
|
||||
bool _is_init;
|
||||
|
||||
std::string _scan_node_type;
|
||||
@ -87,8 +82,6 @@ private:
|
||||
ODBCConnectorParam _odbc_param;
|
||||
// Helper class for converting text to other types;
|
||||
std::unique_ptr<TextConverter> _text_converter;
|
||||
// Current tuple.
|
||||
doris::Tuple* _tuple = nullptr;
|
||||
};
|
||||
} // namespace vectorized
|
||||
} // namespace doris
|
||||
|
||||
Reference in New Issue
Block a user