Fix Stream load double NumberTotalRows (#1664)
This commit is contained in:
@ -35,14 +35,10 @@ class ExprContext;
|
||||
|
||||
struct ScannerCounter {
|
||||
ScannerCounter() :
|
||||
num_rows_total(0),
|
||||
// num_rows_returned(0),
|
||||
num_rows_filtered(0),
|
||||
num_rows_unselected(0) {
|
||||
}
|
||||
|
||||
int64_t num_rows_total; // total read rows (read from source)
|
||||
// int64_t num_rows_returned; // qualified rows (match the dest schema)
|
||||
int64_t num_rows_filtered; // unqualified rows (unmatch the dest schema, or no partition)
|
||||
int64_t num_rows_unselected; // rows filterd by predicates
|
||||
};
|
||||
|
||||
@ -433,7 +433,6 @@ void BrokerScanNode::scanner_worker(int start_idx, int length) {
|
||||
}
|
||||
|
||||
// Update stats
|
||||
_runtime_state->update_num_rows_load_total(counter.num_rows_total);
|
||||
_runtime_state->update_num_rows_load_filtered(counter.num_rows_filtered);
|
||||
_runtime_state->update_num_rows_load_unselected(counter.num_rows_unselected);
|
||||
|
||||
|
||||
@ -97,7 +97,6 @@ Status BrokerScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof) {
|
||||
{
|
||||
COUNTER_UPDATE(_rows_read_counter, 1);
|
||||
SCOPED_TIMER(_materialize_timer);
|
||||
_counter->num_rows_total++;
|
||||
if (convert_one_row(Slice(ptr, size), tuple, tuple_pool)) {
|
||||
break;
|
||||
}
|
||||
@ -384,64 +383,6 @@ bool is_null(const Slice& slice) {
|
||||
slice.data[1] == 'N';
|
||||
}
|
||||
|
||||
// Writes a slot in _tuple from an value containing text data.
|
||||
bool BrokerScanner::write_slot(
|
||||
const std::string& column_name, const TColumnType& column_type,
|
||||
const Slice& value, const SlotDescriptor* slot,
|
||||
Tuple* tuple, MemPool* tuple_pool,
|
||||
std::stringstream* error_msg) {
|
||||
|
||||
if (value.size == 0 && !slot->type().is_string_type()) {
|
||||
(*error_msg) << "the length of input should not be 0. "
|
||||
<< "column_name: " << column_name << "; "
|
||||
<< "type: " << slot->type();
|
||||
return false;
|
||||
}
|
||||
|
||||
char* value_to_convert = value.data;
|
||||
size_t value_to_convert_length = value.size;
|
||||
|
||||
// Fill all the spaces if it is 'TYPE_CHAR' type
|
||||
if (slot->type().is_string_type()) {
|
||||
int char_len = column_type.len;
|
||||
if (value.size > char_len) {
|
||||
(*error_msg) << "the length of input is too long than schema. "
|
||||
<< "column_name: " << column_name << "; "
|
||||
<< "input_str: [" << value.to_string() << "] "
|
||||
<< "type: " << slot->type() << "; "
|
||||
<< "schema length: " << char_len << "; "
|
||||
<< "actual length: " << value.size << "; ";
|
||||
return false;
|
||||
}
|
||||
if (slot->type().type == TYPE_CHAR && value.size < char_len) {
|
||||
if (!is_null(value)) {
|
||||
fill_fix_length_string(
|
||||
value, tuple_pool,
|
||||
&value_to_convert, char_len);
|
||||
value_to_convert_length = char_len;
|
||||
}
|
||||
}
|
||||
} else if (slot->type().is_decimal_type()) {
|
||||
bool is_success = check_decimal_input(
|
||||
value, column_type.precision, column_type.scale, error_msg);
|
||||
if (is_success == false) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
if (!_text_converter->write_slot(
|
||||
slot, tuple, value_to_convert, value_to_convert_length,
|
||||
true, false, tuple_pool)) {
|
||||
(*error_msg) << "convert csv string to "
|
||||
<< slot->type() << " failed. "
|
||||
<< "column_name: " << column_name << "; "
|
||||
<< "input_str: [" << value.to_string() << "]; ";
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
// Convert one row to this tuple
|
||||
bool BrokerScanner::convert_one_row(
|
||||
const Slice& line,
|
||||
|
||||
@ -81,12 +81,6 @@ private:
|
||||
void split_line(
|
||||
const Slice& line, std::vector<Slice>* values);
|
||||
|
||||
// Writes a slot in _tuple from an value containing text data.
|
||||
bool write_slot(
|
||||
const std::string& column_name, const TColumnType& column_type,
|
||||
const Slice& value, const SlotDescriptor* slot,
|
||||
Tuple* tuple, MemPool* tuple_pool, std::stringstream* error_msg);
|
||||
|
||||
void fill_fix_length_string(
|
||||
const Slice& value, MemPool* pool,
|
||||
char** new_value_p, int new_value_length);
|
||||
|
||||
@ -76,7 +76,6 @@ Status ParquetScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof) {
|
||||
{
|
||||
COUNTER_UPDATE(_rows_read_counter, 1);
|
||||
SCOPED_TIMER(_materialize_timer);
|
||||
_counter->num_rows_total++;
|
||||
if (fill_dest_tuple(Slice(), tuple, tuple_pool)) {
|
||||
break;// break iff true
|
||||
}
|
||||
|
||||
@ -618,7 +618,9 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) {
|
||||
COUNTER_SET(_validate_data_timer, _validate_data_ns);
|
||||
COUNTER_SET(_wait_in_flight_packet_timer, _wait_in_flight_packet_ns);
|
||||
COUNTER_SET(_serialize_batch_timer, _serialize_batch_ns);
|
||||
state->update_num_rows_load_total(_number_input_rows);
|
||||
// _number_input_rows don't contain num_rows_load_filtered and num_rows_load_unselected in scan node
|
||||
int64_t num_rows_load_total = _number_input_rows + state->num_rows_load_filtered() + state->num_rows_load_unselected();
|
||||
state->update_num_rows_load_total(num_rows_load_total);
|
||||
state->update_num_rows_load_filtered(_number_filtered_rows);
|
||||
|
||||
// print log of add batch time of all node, for tracing load performance easily
|
||||
|
||||
Reference in New Issue
Block a user