[Load] Support multi bytes LineDelimiter and ColumnSeparator (#5462)

* [Internal][Support Multibytes Separator] doris-1079
support multi bytes LineDelimiter and ColumnSeparator
This commit is contained in:
Zhengguo Yang
2021-03-09 09:35:39 +08:00
committed by GitHub
parent a1160bcd99
commit e023ef5404
32 changed files with 240 additions and 135 deletions

View File

@ -34,13 +34,14 @@ namespace doris {
PlainTextLineReader::PlainTextLineReader(RuntimeProfile* profile, FileReader* file_reader,
Decompressor* decompressor, size_t length,
uint8_t line_delimiter)
const std::string& line_delimiter, size_t line_delimiter_length)
: _profile(profile),
_file_reader(file_reader),
_decompressor(decompressor),
_min_length(length),
_total_read_bytes(0),
_line_delimiter(line_delimiter),
_line_delimiter_length(line_delimiter_length),
_input_buf(new uint8_t[INPUT_CHUNK]),
_input_buf_size(INPUT_CHUNK),
_input_buf_pos(0),
@ -92,7 +93,7 @@ inline bool PlainTextLineReader::update_eof() {
uint8_t* PlainTextLineReader::update_field_pos_and_find_line_delimiter(const uint8_t* start,
size_t len) {
// TODO: meanwhile find and save field pos
return (uint8_t*)memmem(start, len, &_line_delimiter, 1);
return (uint8_t*)memmem(start, len, _line_delimiter.c_str(), _line_delimiter_length);
}
// extend input buf if necessary only when _more_input_bytes > 0
@ -195,10 +196,12 @@ Status PlainTextLineReader::read_line(const uint8_t** ptr, size_t* size, bool* e
if (pos == nullptr) {
// didn't find line delimiter, read more data from decompressor
// 1. point 'offset' to _output_buf_limit
offset = output_buf_read_remaining();
// 2. read from file reader
// for multi bytes delimiter we cannot set offset to avoid incomplete
// delimiter
// read from file reader
if (_line_delimiter_length == 1) {
offset = output_buf_read_remaining();
}
extend_output_buf();
if ((_input_buf_limit > _input_buf_pos) && _more_input_bytes == 0) {
// we still have data in input which is not decompressed.
@ -266,7 +269,7 @@ Status PlainTextLineReader::read_line(const uint8_t** ptr, size_t* size, bool* e
if (_decompressor != nullptr) {
SCOPED_TIMER(_decompress_timer);
// 2. decompress
// decompress
size_t input_read_bytes = 0;
size_t decompressed_len = 0;
_more_input_bytes = 0;
@ -316,7 +319,7 @@ Status PlainTextLineReader::read_line(const uint8_t** ptr, size_t* size, bool* e
// we found a complete line
// ready to return
offset = pos - cur_ptr;
found_line_delimiter = 1;
found_line_delimiter = _line_delimiter_length;
break;
}
} // while (!done())