From cd2b8373c28fe342a62cb2a7fa15a5637ade61e5 Mon Sep 17 00:00:00 2001 From: kangkaisen Date: Mon, 19 Aug 2019 12:23:43 +0800 Subject: [PATCH] Fix Stream load double NumberTotalRows (#1664) --- be/src/exec/base_scanner.h | 4 --- be/src/exec/broker_scan_node.cpp | 1 - be/src/exec/broker_scanner.cpp | 59 -------------------------------- be/src/exec/broker_scanner.h | 6 ---- be/src/exec/parquet_scanner.cpp | 1 - be/src/exec/tablet_sink.cpp | 4 ++- 6 files changed, 3 insertions(+), 72 deletions(-) diff --git a/be/src/exec/base_scanner.h b/be/src/exec/base_scanner.h index 884df553b0..748ad9c4b3 100644 --- a/be/src/exec/base_scanner.h +++ b/be/src/exec/base_scanner.h @@ -35,14 +35,10 @@ class ExprContext; struct ScannerCounter { ScannerCounter() : - num_rows_total(0), - // num_rows_returned(0), num_rows_filtered(0), num_rows_unselected(0) { } - int64_t num_rows_total; // total read rows (read from source) - // int64_t num_rows_returned; // qualified rows (match the dest schema) int64_t num_rows_filtered; // unqualified rows (unmatch the dest schema, or no partition) int64_t num_rows_unselected; // rows filterd by predicates }; diff --git a/be/src/exec/broker_scan_node.cpp b/be/src/exec/broker_scan_node.cpp index 358a9bb391..e1623dae35 100644 --- a/be/src/exec/broker_scan_node.cpp +++ b/be/src/exec/broker_scan_node.cpp @@ -433,7 +433,6 @@ void BrokerScanNode::scanner_worker(int start_idx, int length) { } // Update stats - _runtime_state->update_num_rows_load_total(counter.num_rows_total); _runtime_state->update_num_rows_load_filtered(counter.num_rows_filtered); _runtime_state->update_num_rows_load_unselected(counter.num_rows_unselected); diff --git a/be/src/exec/broker_scanner.cpp b/be/src/exec/broker_scanner.cpp index 2fc7ff4229..08c590622a 100644 --- a/be/src/exec/broker_scanner.cpp +++ b/be/src/exec/broker_scanner.cpp @@ -97,7 +97,6 @@ Status BrokerScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof) { { COUNTER_UPDATE(_rows_read_counter, 1); SCOPED_TIMER(_materialize_timer); - _counter->num_rows_total++; if (convert_one_row(Slice(ptr, size), tuple, tuple_pool)) { break; } @@ -384,64 +383,6 @@ bool is_null(const Slice& slice) { slice.data[1] == 'N'; } -// Writes a slot in _tuple from an value containing text data. -bool BrokerScanner::write_slot( - const std::string& column_name, const TColumnType& column_type, - const Slice& value, const SlotDescriptor* slot, - Tuple* tuple, MemPool* tuple_pool, - std::stringstream* error_msg) { - - if (value.size == 0 && !slot->type().is_string_type()) { - (*error_msg) << "the length of input should not be 0. " - << "column_name: " << column_name << "; " - << "type: " << slot->type(); - return false; - } - - char* value_to_convert = value.data; - size_t value_to_convert_length = value.size; - - // Fill all the spaces if it is 'TYPE_CHAR' type - if (slot->type().is_string_type()) { - int char_len = column_type.len; - if (value.size > char_len) { - (*error_msg) << "the length of input is too long than schema. " - << "column_name: " << column_name << "; " - << "input_str: [" << value.to_string() << "] " - << "type: " << slot->type() << "; " - << "schema length: " << char_len << "; " - << "actual length: " << value.size << "; "; - return false; - } - if (slot->type().type == TYPE_CHAR && value.size < char_len) { - if (!is_null(value)) { - fill_fix_length_string( - value, tuple_pool, - &value_to_convert, char_len); - value_to_convert_length = char_len; - } - } - } else if (slot->type().is_decimal_type()) { - bool is_success = check_decimal_input( - value, column_type.precision, column_type.scale, error_msg); - if (is_success == false) { - return false; - } - } - - if (!_text_converter->write_slot( - slot, tuple, value_to_convert, value_to_convert_length, - true, false, tuple_pool)) { - (*error_msg) << "convert csv string to " - << slot->type() << " failed. " - << "column_name: " << column_name << "; " - << "input_str: [" << value.to_string() << "]; "; - return false; - } - - return true; -} - // Convert one row to this tuple bool BrokerScanner::convert_one_row( const Slice& line, diff --git a/be/src/exec/broker_scanner.h b/be/src/exec/broker_scanner.h index 9fa8c824c0..1eb11221a5 100644 --- a/be/src/exec/broker_scanner.h +++ b/be/src/exec/broker_scanner.h @@ -81,12 +81,6 @@ private: void split_line( const Slice& line, std::vector* values); - // Writes a slot in _tuple from an value containing text data. - bool write_slot( - const std::string& column_name, const TColumnType& column_type, - const Slice& value, const SlotDescriptor* slot, - Tuple* tuple, MemPool* tuple_pool, std::stringstream* error_msg); - void fill_fix_length_string( const Slice& value, MemPool* pool, char** new_value_p, int new_value_length); diff --git a/be/src/exec/parquet_scanner.cpp b/be/src/exec/parquet_scanner.cpp index b0e385560f..41b71f1feb 100644 --- a/be/src/exec/parquet_scanner.cpp +++ b/be/src/exec/parquet_scanner.cpp @@ -76,7 +76,6 @@ Status ParquetScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof) { { COUNTER_UPDATE(_rows_read_counter, 1); SCOPED_TIMER(_materialize_timer); - _counter->num_rows_total++; if (fill_dest_tuple(Slice(), tuple, tuple_pool)) { break;// break iff true } diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index 580678d4ea..9b4d35cf77 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -618,7 +618,9 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) { COUNTER_SET(_validate_data_timer, _validate_data_ns); COUNTER_SET(_wait_in_flight_packet_timer, _wait_in_flight_packet_ns); COUNTER_SET(_serialize_batch_timer, _serialize_batch_ns); - state->update_num_rows_load_total(_number_input_rows); + // _number_input_rows don't contain num_rows_load_filtered and num_rows_load_unselected in scan node + int64_t num_rows_load_total = _number_input_rows + state->num_rows_load_filtered() + state->num_rows_load_unselected(); + state->update_num_rows_load_total(num_rows_load_total); state->update_num_rows_load_filtered(_number_filtered_rows); // print log of add batch time of all node, for tracing load performance easily