[fix](load) fix loaded rows error (#28061)

This commit is contained in:
HHoflittlefish777
2023-12-07 14:43:51 +08:00
committed by GitHub
parent 1dabd0af43
commit 3f3843fd4f
3 changed files with 190 additions and 1 deletions

View File

@ -1522,6 +1522,7 @@ Status VTabletWriter::close(Status exec_status) {
COUNTER_SET(_max_wait_exec_timer, max_wait_exec_time_ns);
COUNTER_SET(_add_batch_number, total_add_batch_num);
COUNTER_SET(_num_node_channels, num_node_channels);
// _number_input_rows don't contain num_rows_load_filtered and num_rows_load_unselected in scan node
int64_t num_rows_load_total = _number_input_rows + _state->num_rows_load_filtered() +
_state->num_rows_load_unselected();
@ -1634,6 +1635,7 @@ Status VTabletWriter::append_block(doris::vectorized::Block& input_block) {
std::shared_ptr<vectorized::Block> block;
bool has_filtered_rows = false;
int64_t filtered_rows = 0;
_number_input_rows += rows;
RETURN_IF_ERROR(_row_distribution.generate_rows_distribution(
input_block, block, filtered_rows, has_filtered_rows, _row_part_tablet_ids,
@ -1644,7 +1646,6 @@ Status VTabletWriter::append_block(doris::vectorized::Block& input_block) {
channel_to_payload.resize(_channels.size());
_generate_index_channels_payloads(_row_part_tablet_ids, channel_to_payload);
_number_input_rows += rows;
// update incrementally so that FE can get the progress.
// the real 'num_rows_load_total' will be set when sink being closed.
_state->update_num_rows_load_total(rows);

File diff suppressed because one or more lines are too long

View File

@ -1487,5 +1487,93 @@ suite("test_stream_load", "p0") {
}
sql "sync"
order_qt_temporary_partitions "SELECT * FROM ${tableName18} TEMPORARY PARTITION(test) order by k1"
// test length of input is too long than schema.
def tableName19 = "test_input_long_than_schema"
try {
sql """ DROP TABLE IF EXISTS ${tableName19} """
sql """
CREATE TABLE IF NOT EXISTS ${tableName19} (
`c1` varchar(48) NULL,
`c2` varchar(48) NULL,
`c3` varchar(48) NULL,
`c4` varchar(48) NULL,
`c5` varchar(48) NULL,
`c6` varchar(48) NULL,
`c7` varchar(48) NULL,
`c8` varchar(48) NULL,
`c9` varchar(48) NULL,
`c10` varchar(48) NULL,
`c11` varchar(48) NULL,
`c12` varchar(48) NULL,
`c13` varchar(48) NULL,
`c14` varchar(48) NULL,
`c15` varchar(48) NULL,
`c16` varchar(48) NULL,
`c17` varchar(48) NULL,
`c18` varchar(48) NULL,
`c19` varchar(48) NULL,
`c20` varchar(48) NULL,
`c21` varchar(48) NULL,
`c22` varchar(48) NULL,
`c23` varchar(48) NULL,
`c24` varchar(48) NULL,
`c25` varchar(48) NULL,
`c26` varchar(48) NULL,
`c27` varchar(48) NULL,
`c28` varchar(48) NULL,
`c29` varchar(48) NULL,
`c30` varchar(48) NULL,
`c31` varchar(48) NULL,
`c32` varchar(48) NULL,
`c33` varchar(48) NULL,
`c34` varchar(48) NULL,
`c35` varchar(48) NULL,
`c36` varchar(48) NULL,
`c37` varchar(48) NULL,
`c38` varchar(48) NULL,
`c39` varchar(48) NULL,
`c40` varchar(48) NULL,
`c41` varchar(48) NULL,
`c42` varchar(48) NULL,
`c43` varchar(48) NULL,
`c44` varchar(48) NULL,
`c45` varchar(48) NULL,
`c46` varchar(48) NULL,
`c47` varchar(48) NULL,
`c48` varchar(48) NULL,
`c49` varchar(48) NULL,
`c50` varchar(48) NULL,
) ENGINE=OLAP
DUPLICATE KEY(`c1`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`c1`) BUCKETS AUTO
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"disable_auto_compaction" = "false"
);
"""
streamLoad {
table "${tableName19}"
set 'column_separator', ','
file 'test_input_long_than_schema.csv'
check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("fail", json.Status.toLowerCase())
assertTrue(json.Message.contains("[DATA_QUALITY_ERROR]too many filtered rows"))
assertEquals(100, json.NumberTotalRows)
assertEquals(100, json.NumberFilteredRows)
assertEquals(0, json.NumberUnselectedRows)
}
}
} finally {
sql """ DROP TABLE IF EXISTS ${tableName19} FORCE"""
}
}