[fix](array-type) fix the invalid format load for stream load (#12424)
this pr is used to fix the invalid format load for stream load.
before the change , we will get the error when we load the invalid array format.
the origin file to load :
1 [1, 2, 3]
2 [4, 5, 6]
3 \N
4 [7, \N, 8]
5 10, 11, 12
[hugo@xafj-palo]$ sh curl_cmd.sh
{
"TxnId": 11035,
"Label": "11c9f111-188e-4616-9a50-aec8b7814513",
"TwoPhaseCommit": "false",
"Status": "Fail",
"Message": "Array does not start with '[' character, found '1'",
"NumberTotalRows": 0,
"NumberLoadedRows": 0,
"NumberFilteredRows": 0,
"NumberUnselectedRows": 0,
"LoadBytes": 55,
"LoadTimeMs": 7,
"BeginTxnTimeMs": 0,
"StreamLoadPutTimeMs": 2,
"ReadDataTimeMs": 0,
"WriteDataTimeMs": 3,
"CommitAndPublishTimeMs": 0
}
3. after this change, we will get success and the error url which report the error line.
[hugo@xafj-palo]$ sh curl_cmd.sh
{
"TxnId": 11046,
"Label": "249808ee-55f4-4c08-b671-b3d82689d614",
"TwoPhaseCommit": "false",
"Status": "Success",
"Message": "OK",
"NumberTotalRows": 5,
"NumberLoadedRows": 4,
"NumberFilteredRows": 1,
"NumberUnselectedRows": 0,
"LoadBytes": 55,
"LoadTimeMs": 39,
"BeginTxnTimeMs": 0,
"StreamLoadPutTimeMs": 2,
"ReadDataTimeMs": 0,
"WriteDataTimeMs": 19,
"CommitAndPublishTimeMs": 16,
"ErrorURL": "http://10.81.85.89:8502/api/_load_error_log?file=__shard_3/error_log_insert_stmt_8d4130f0c18aeb0a-ad7ffd4233c41893_8d4130f0c18aeb0a_ad7ffd4233c41893"
}
the sql select result:
MySQL [example_db]> select * from array_test06;
+------+--------------+
| k1 | k2 |
+------+--------------+
| 1 | [1, 2, 3] |
| 2 | [4, 5, 6] |
| 3 | NULL |
| 4 | [7, NULL, 8] |
+------+--------------+
4 rows in set (0.019 sec)
the url page show us:
"Reason: Invalid format for array column(k2). src line [10, 11, 12]; "
Issue Number: #7570
This commit is contained in:
@ -487,4 +487,38 @@ void BaseScanner::_fill_columns_from_path() {
|
||||
}
|
||||
}
|
||||
|
||||
bool BaseScanner::is_null(const Slice& slice) {
|
||||
return slice.size == 2 && slice.data[0] == '\\' && slice.data[1] == 'N';
|
||||
}
|
||||
|
||||
bool BaseScanner::is_array(const Slice& slice) {
|
||||
return slice.size > 1 && slice.data[0] == '[' && slice.data[slice.size - 1] == ']';
|
||||
}
|
||||
|
||||
bool BaseScanner::check_array_format(std::vector<Slice>& split_values) {
|
||||
// if not the array format, filter this line and return error url
|
||||
auto dest_slot_descs = _dest_tuple_desc->slots();
|
||||
for (int j = 0; j < split_values.size() && j < dest_slot_descs.size(); ++j) {
|
||||
auto dest_slot_desc = dest_slot_descs[j];
|
||||
if (!dest_slot_desc->is_materialized()) {
|
||||
continue;
|
||||
}
|
||||
const Slice& value = split_values[j];
|
||||
if (dest_slot_desc->type().is_array_type() && !is_null(value) && !is_array(value)) {
|
||||
RETURN_IF_ERROR(_state->append_error_msg_to_file(
|
||||
[&]() -> std::string { return std::string(value.data, value.size); },
|
||||
[&]() -> std::string {
|
||||
fmt::memory_buffer err_msg;
|
||||
fmt::format_to(err_msg, "Invalid format for array column({})",
|
||||
dest_slot_desc->col_name());
|
||||
return fmt::to_string(err_msg);
|
||||
},
|
||||
&_scanner_eof));
|
||||
_counter->num_rows_filtered++;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
} // namespace doris
|
||||
|
||||
@ -92,6 +92,10 @@ protected:
|
||||
Status _fill_dest_block(vectorized::Block* dest_block, bool* eof);
|
||||
virtual Status _init_src_block();
|
||||
|
||||
bool is_null(const Slice& slice);
|
||||
bool is_array(const Slice& slice);
|
||||
bool check_array_format(std::vector<Slice>& split_values);
|
||||
|
||||
RuntimeState* _state;
|
||||
const TBrokerScanRangeParams& _params;
|
||||
|
||||
|
||||
@ -389,10 +389,6 @@ bool BrokerScanner::check_decimal_input(const Slice& slice, int precision, int s
|
||||
return true;
|
||||
}
|
||||
|
||||
bool is_null(const Slice& slice) {
|
||||
return slice.size == 2 && slice.data[0] == '\\' && slice.data[1] == 'N';
|
||||
}
|
||||
|
||||
// Convert one row to this tuple
|
||||
Status BrokerScanner::_convert_one_row(const Slice& line, Tuple* tuple, MemPool* tuple_pool,
|
||||
bool* fill_tuple) {
|
||||
@ -494,6 +490,10 @@ Status BrokerScanner::_line_to_src_tuple(const Slice& line) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
if (!check_array_format(_split_values)) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
for (int i = 0; i < _split_values.size(); ++i) {
|
||||
auto slot_desc = _src_slot_descs[i];
|
||||
const Slice& value = _split_values[i];
|
||||
|
||||
@ -185,6 +185,8 @@ struct TypeDescriptor {
|
||||
|
||||
bool is_collection_type() const { return type == TYPE_ARRAY || type == TYPE_MAP; }
|
||||
|
||||
bool is_array_type() const { return type == TYPE_ARRAY; }
|
||||
|
||||
/// Returns the byte size of this type. Returns 0 for variable length types.
|
||||
int get_byte_size() const { return ::doris::get_byte_size(type); }
|
||||
|
||||
|
||||
@ -30,10 +30,6 @@
|
||||
|
||||
namespace doris::vectorized {
|
||||
|
||||
bool is_null(const Slice& slice) {
|
||||
return slice.size == 2 && slice.data[0] == '\\' && slice.data[1] == 'N';
|
||||
}
|
||||
|
||||
VBrokerScanner::VBrokerScanner(RuntimeState* state, RuntimeProfile* profile,
|
||||
const TBrokerScanRangeParams& params,
|
||||
const std::vector<TBrokerRangeDesc>& ranges,
|
||||
@ -95,6 +91,10 @@ Status VBrokerScanner::_fill_dest_columns(const Slice& line,
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
if (!check_array_format(_split_values)) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
int idx = 0;
|
||||
for (int i = 0; i < _split_values.size(); ++i) {
|
||||
int dest_index = idx++;
|
||||
|
||||
@ -1,3 +1,5 @@
|
||||
1/[1,2,3,4,5]/[32767,32768,32769]/[65534,65535,65536]/["a","b","c","d","e"]/["hello","world"]/["1991-01-01"]/["1991-01-01 00:00:00"]/[0.33,0.67]/[3.1415926,0.878787878]/[1,1.2,1.3]
|
||||
2/[1,2,3,4,5]/[32767,32768,32769]/[65534,65535,65536]/["a","b","c","d","e"]/["hello","world"]/\N/\N/\N/\N/[1,\N,1.3]
|
||||
3/\N/\N/\N/\N/\N/\N/\N/\N/\N/\N
|
||||
3/\N/\N/\N/\N/\N/\N/\N/\N/\N/\N
|
||||
4/1,2,3,4,5/\N/\N/\N/\N/\N/\N/\N/\N/\N
|
||||
5/[1,2,3,4,5/\N/\N/\N/\N/\N/\N/\N/\N/\N
|
||||
@ -124,6 +124,7 @@ suite("test_array_load", "p0") {
|
||||
set 'where', where_expr
|
||||
set 'fuzzy_parse', fuzzy_flag
|
||||
set 'column_separator', column_sep
|
||||
set 'max_filter_ratio', '0.6'
|
||||
file file_name // import json file
|
||||
time 10000 // limit inflight 10s
|
||||
|
||||
@ -136,7 +137,7 @@ suite("test_array_load", "p0") {
|
||||
log.info("Stream load result: ${result}".toString())
|
||||
def json = parseJson(result)
|
||||
assertEquals("success", json.Status.toLowerCase())
|
||||
assertEquals(json.NumberTotalRows, json.NumberLoadedRows + json.NumberUnselectedRows)
|
||||
assertEquals(json.NumberTotalRows, json.NumberLoadedRows + json.NumberUnselectedRows + json.NumberFilteredRows)
|
||||
assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user