[fix](csv-reader) fix bug that csv reader can not read text format hms table (#13515)

1. Missing field and line delimiter
2. When query external table with text(csv) format, we should pass the column position map to BE,
    otherwise the column order is wrong.

TODO:
1. For now, if we query csv file with non-exist column, it will return null.
    But it should return null or default value of that column.
2. Add regression test after hive docker is ready.
This commit is contained in:
Mingyu Chen
2022-10-22 22:40:03 +08:00
committed by GitHub
parent a7c221d04e
commit 3a3def447d
11 changed files with 147 additions and 57 deletions

View File

@ -194,7 +194,7 @@ inline bool TextConverter::write_vec_column(const SlotDescriptor* slot_desc,
size_t len, bool copy_string, bool need_escape) {
vectorized::IColumn* col_ptr = nullable_col_ptr;
// \N means it's NULL
if (true == slot_desc->is_nullable()) {
if (slot_desc->is_nullable()) {
auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(nullable_col_ptr);
if ((len == 2 && data[0] == '\\' && data[1] == 'N') || len == SQL_NULL_DATA) {
nullable_column->insert_data(nullptr, 0);

View File

@ -33,6 +33,9 @@
#include "vec/exec/scan/vfile_scanner.h"
namespace doris::vectorized {
const static Slice _s_null_slice = Slice("\\N");
CsvReader::CsvReader(RuntimeState* state, RuntimeProfile* profile, ScannerCounter* counter,
const TFileScanRangeParams& params, const TFileRangeDesc& range,
const std::vector<SlotDescriptor*>& file_slot_descs)
@ -57,7 +60,7 @@ CsvReader::CsvReader(RuntimeState* state, RuntimeProfile* profile, ScannerCounte
CsvReader::~CsvReader() {}
Status CsvReader::init_reader() {
Status CsvReader::init_reader(bool is_load) {
// set the skip lines and start offset
int64_t start_offset = _range.start_offset;
if (start_offset == 0 && _params.__isset.file_attributes &&
@ -124,20 +127,32 @@ Status CsvReader::init_reader() {
_file_format_type);
}
_is_load = is_load;
if (!_is_load) {
// For query task, we need to save the mapping from table schema to file column
DCHECK(_params.__isset.column_idxs);
_col_idxs = _params.column_idxs;
} else {
// For load task, the column order is same as file column order
int i = 0;
for (auto& desc [[maybe_unused]] : _file_slot_descs) {
_col_idxs.push_back(i++);
}
}
_line_reader_eof = false;
return Status::OK();
}
Status CsvReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
if (_line_reader_eof == true) {
if (_line_reader_eof) {
*eof = true;
return Status::OK();
}
const int batch_size = _state->batch_size();
auto columns = block->mutate_columns();
while (columns[0]->size() < batch_size && !_line_reader_eof) {
size_t rows = 0;
while (rows < batch_size && !_line_reader_eof) {
const uint8_t* ptr = nullptr;
size_t size = 0;
RETURN_IF_ERROR(_line_reader->read_line(&ptr, &size, &_line_reader_eof));
@ -150,16 +165,11 @@ Status CsvReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
continue;
}
// TODO(ftw): check read_rows?
++(*read_rows);
RETURN_IF_ERROR(_fill_dest_columns(Slice(ptr, size), columns));
if (_line_reader_eof == true) {
*eof = true;
break;
}
RETURN_IF_ERROR(_fill_dest_columns(Slice(ptr, size), block, &rows));
}
columns.clear();
*eof = (rows == 0);
*read_rows = rows;
return Status::OK();
}
@ -228,7 +238,7 @@ Status CsvReader::_create_decompressor() {
return Status::OK();
}
Status CsvReader::_fill_dest_columns(const Slice& line, std::vector<MutableColumnPtr>& columns) {
Status CsvReader::_fill_dest_columns(const Slice& line, Block* block, size_t* rows) {
bool is_success = false;
RETURN_IF_ERROR(_line_split_to_values(line, &is_success));
@ -240,53 +250,67 @@ Status CsvReader::_fill_dest_columns(const Slice& line, std::vector<MutableColum
// if _split_values.size > _file_slot_descs.size()
// we only take the first few columns
for (int i = 0; i < _file_slot_descs.size(); ++i) {
// TODO(ftw): no need of src_slot_desc
auto src_slot_desc = _file_slot_descs[i];
const Slice& value = _split_values[i];
_text_converter->write_string_column(src_slot_desc, &columns[i], value.data, value.size);
int col_idx = _col_idxs[i];
// col idx is out of range, fill with null.
const Slice& value =
col_idx < _split_values.size() ? _split_values[col_idx] : _s_null_slice;
IColumn* col_ptr = const_cast<IColumn*>(block->get_by_position(i).column.get());
_text_converter->write_vec_column(src_slot_desc, col_ptr, value.data, value.size, true,
false);
}
++(*rows);
return Status::OK();
}
Status CsvReader::_line_split_to_values(const Slice& line, bool* success) {
if (!validate_utf8(line.data, line.size)) {
RETURN_IF_ERROR(_state->append_error_msg_to_file(
[]() -> std::string { return "Unable to display"; },
[]() -> std::string {
fmt::memory_buffer error_msg;
fmt::format_to(error_msg, "{}", "Unable to display");
return fmt::to_string(error_msg);
},
&_line_reader_eof));
_counter->num_rows_filtered++;
*success = false;
return Status::OK();
if (!_is_load) {
return Status::InternalError("Only support csv data in utf8 codec");
} else {
RETURN_IF_ERROR(_state->append_error_msg_to_file(
[]() -> std::string { return "Unable to display"; },
[]() -> std::string {
fmt::memory_buffer error_msg;
fmt::format_to(error_msg, "{}", "Unable to display");
return fmt::to_string(error_msg);
},
&_line_reader_eof));
_counter->num_rows_filtered++;
*success = false;
return Status::OK();
}
}
_split_line(line);
// if actual column number in csv file is not equal to _file_slot_descs.size()
// then filter this line.
if (_split_values.size() != _file_slot_descs.size()) {
std::string cmp_str =
_split_values.size() > _file_slot_descs.size() ? "more than" : "less than";
RETURN_IF_ERROR(_state->append_error_msg_to_file(
[&]() -> std::string { return std::string(line.data, line.size); },
[&]() -> std::string {
fmt::memory_buffer error_msg;
fmt::format_to(error_msg, "{} {} {}", "actual column number in csv file is ",
cmp_str, " schema column number.");
fmt::format_to(error_msg, "actual number: {}, column separator: [{}], ",
_split_values.size(), _value_separator);
fmt::format_to(error_msg, "line delimiter: [{}], schema column number: {}; ",
_line_delimiter, _file_slot_descs.size());
return fmt::to_string(error_msg);
},
&_line_reader_eof));
_counter->num_rows_filtered++;
*success = false;
return Status::OK();
if (_is_load) {
// Only check for load task. For query task, the non exist column will be filled "null".
// if actual column number in csv file is not equal to _file_slot_descs.size()
// then filter this line.
if (_split_values.size() != _file_slot_descs.size()) {
std::string cmp_str =
_split_values.size() > _file_slot_descs.size() ? "more than" : "less than";
RETURN_IF_ERROR(_state->append_error_msg_to_file(
[&]() -> std::string { return std::string(line.data, line.size); },
[&]() -> std::string {
fmt::memory_buffer error_msg;
fmt::format_to(error_msg, "{} {} {}",
"actual column number in csv file is ", cmp_str,
" schema column number.");
fmt::format_to(error_msg, "actual number: {}, column separator: [{}], ",
_split_values.size(), _value_separator);
fmt::format_to(error_msg,
"line delimiter: [{}], schema column number: {}; ",
_line_delimiter, _file_slot_descs.size());
return fmt::to_string(error_msg);
},
&_line_reader_eof));
_counter->num_rows_filtered++;
*success = false;
return Status::OK();
}
}
*success = true;

View File

@ -36,14 +36,14 @@ public:
const std::vector<SlotDescriptor*>& file_slot_descs);
~CsvReader() override;
Status init_reader();
Status init_reader(bool is_query);
Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type,
std::unordered_set<std::string>* missing_cols) override;
private:
Status _create_decompressor();
Status _fill_dest_columns(const Slice& line, std::vector<MutableColumnPtr>& columns);
Status _fill_dest_columns(const Slice& line, Block* block, size_t* rows);
Status _line_split_to_values(const Slice& line, bool* success);
void _split_line(const Slice& line);
Status _check_array_format(std::vector<Slice>& split_values, bool* is_success);
@ -57,6 +57,13 @@ private:
const TFileScanRangeParams& _params;
const TFileRangeDesc& _range;
const std::vector<SlotDescriptor*>& _file_slot_descs;
// Only for query task, save the columns' index which need to be read.
// eg, there are 3 cols in "_file_slot_descs" named: k1, k2, k3
// and the corressponding position in file is 0, 3, 5.
// So the _col_idx will be: <0, 3, 5>
std::vector<int> _col_idxs;
// True if this is a load task
bool _is_load = false;
// _file_reader_s is for stream load pipe reader,
// and _file_reader is for other file reader.

View File

@ -492,7 +492,7 @@ Status VFileScanner::_get_next_reader() {
case TFileFormatType::FORMAT_CSV_DEFLATE: {
_cur_reader.reset(
new CsvReader(_state, _profile, &_counter, _params, range, _file_slot_descs));
init_status = ((CsvReader*)(_cur_reader.get()))->init_reader();
init_status = ((CsvReader*)(_cur_reader.get()))->init_reader(_is_load);
break;
}
default:

View File

@ -810,6 +810,8 @@ public class HiveMetaStoreClientHelper {
return Type.FLOAT;
case "double":
return Type.DOUBLE;
case "string":
return Type.STRING;
default:
break;
}
@ -923,3 +925,4 @@ public class HiveMetaStoreClientHelper {
return output.toString();
}
}

View File

@ -73,6 +73,17 @@ public interface TableIf {
Column getColumn(String name);
default int getBaseColumnIdxByName(String colName) {
int i = 0;
for (Column col : getBaseSchema()) {
if (col.getName().equalsIgnoreCase(colName)) {
return i;
}
++i;
}
return -1;
}
String getMysqlType();
String getEngine();
@ -163,3 +174,4 @@ public interface TableIf {
}
}
}

View File

@ -311,3 +311,4 @@ public class HMSExternalTable extends ExternalTable {
return catalog.getCatalogProperty().getS3Properties();
}
}

View File

@ -70,6 +70,7 @@ public class HMSExternalCatalog extends ExternalCatalog {
client = new HiveMetaStoreClient(hiveConf);
} catch (MetaException e) {
LOG.warn("Failed to create HiveMetaStoreClient: {}", e.getMessage());
return;
}
List<String> allDatabases;
try {

View File

@ -278,6 +278,7 @@ public class ExternalFileScanNode extends ExternalScanNode {
ParamCreateContext context = contexts.get(i);
FileScanProviderIf scanProvider = scanProviders.get(i);
setDefaultValueExprs(scanProvider, context);
setColumnPositionMappingForTextFile(scanProvider, context);
finalizeParamsForLoad(context, analyzer);
createScanRangeLocations(context, scanProvider);
this.inputSplitsNum += scanProvider.getInputSplitNum();
@ -285,6 +286,27 @@ public class ExternalFileScanNode extends ExternalScanNode {
}
}
private void setColumnPositionMappingForTextFile(FileScanProviderIf scanProvider, ParamCreateContext context)
throws UserException {
if (type != Type.QUERY) {
return;
}
TableIf tbl = scanProvider.getTargetTable();
List<Integer> columnIdxs = Lists.newArrayList();
for (SlotDescriptor slot : desc.getSlots()) {
if (!slot.isMaterialized()) {
continue;
}
String colName = slot.getColumn().getName();
int idx = tbl.getBaseColumnIdxByName(colName);
if (idx == -1) {
throw new UserException("Column " + colName + " not found in table " + tbl.getName());
}
columnIdxs.add(idx);
}
context.params.setColumnIdxs(columnIdxs);
}
protected void setDefaultValueExprs(FileScanProviderIf scanProvider, ParamCreateContext context)
throws UserException {
TableIf tbl = scanProvider.getTargetTable();
@ -320,7 +342,7 @@ public class ExternalFileScanNode extends ExternalScanNode {
default:
Preconditions.checkState(false, type);
}
// if slot desc is null, which mean it is a unrelated slot, just skip.
// if slot desc is null, which mean it is an unrelated slot, just skip.
// eg:
// (a, b, c) set (x=a, y=b, z=c)
// c does not exist in file, the z will be filled with null, even if z has default value.
@ -499,5 +521,3 @@ public class ExternalFileScanNode extends ExternalScanNode {
}
}

View File

@ -35,11 +35,13 @@ import org.apache.doris.load.BrokerFileGroup;
import org.apache.doris.planner.external.ExternalFileScanNode.ParamCreateContext;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TExternalScanRange;
import org.apache.doris.thrift.TFileAttributes;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileRangeDesc;
import org.apache.doris.thrift.TFileScanRange;
import org.apache.doris.thrift.TFileScanRangeParams;
import org.apache.doris.thrift.TFileScanSlotInfo;
import org.apache.doris.thrift.TFileTextScanRangeParams;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.THdfsParams;
import org.apache.doris.thrift.TNetworkAddress;
@ -76,6 +78,10 @@ import java.util.stream.Collectors;
public class HiveScanProvider implements HMSTableScanProviderIf {
private static final Logger LOG = LogManager.getLogger(HiveScanProvider.class);
private static final String PROP_FIELD_DELIMITER = "field.delim";
private static final String DEFAULT_FIELD_DELIMITER = "|";
private static final String DEFAULT_LINE_DELIMITER = "\n";
protected HMSExternalTable hmsTable;
protected int inputSplitNum = 0;
@ -268,7 +274,20 @@ public class HiveScanProvider implements HMSTableScanProviderIf {
String fsName = fullPath.replace(filePath, "");
TFileType locationType = getLocationType();
context.params.setFileType(locationType);
TFileFormatType fileFormatType = getFileFormatType();
context.params.setFormatType(getFileFormatType());
if (fileFormatType == TFileFormatType.FORMAT_CSV_PLAIN) {
TFileTextScanRangeParams textParams = new TFileTextScanRangeParams();
String columnSeparator
= hmsTable.getRemoteTable().getSd().getSerdeInfo().getParameters()
.getOrDefault(PROP_FIELD_DELIMITER, DEFAULT_FIELD_DELIMITER);
textParams.setColumnSeparator(columnSeparator);
textParams.setLineDelimiter(DEFAULT_LINE_DELIMITER);
TFileAttributes fileAttributes = new TFileAttributes();
fileAttributes.setTextParams(textParams);
context.params.setFileAttributes(fileAttributes);
}
// set hdfs params for hdfs file type.
Map<String, String> locationProperties = getLocationProperties();
if (locationType == TFileType.FILE_HDFS) {
@ -364,3 +383,4 @@ public class HiveScanProvider implements HMSTableScanProviderIf {
}

View File

@ -289,6 +289,8 @@ struct TFileScanRangeParams {
14: optional list<Types.TNetworkAddress> broker_addresses
15: optional TFileAttributes file_attributes
16: optional Exprs.TExpr pre_filter_exprs
// For csv query task, same the column index in file, order by dest_tuple
17: optional list<i32> column_idxs
}
struct TFileRangeDesc {