[fix](pipeline-load) fix no error url when data quality error and total rows is negative (#34072) (#34204)
Co-authored-by: HHoflittlefish777 <77738092+HHoflittlefish777@users.noreply.github.com>
This commit is contained in:
@ -78,38 +78,25 @@ Status StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte
|
||||
}
|
||||
ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id);
|
||||
ctx->commit_infos = std::move(state->tablet_commit_infos());
|
||||
ctx->number_total_rows = state->num_rows_load_total();
|
||||
ctx->number_loaded_rows = state->num_rows_load_success();
|
||||
ctx->number_filtered_rows = state->num_rows_load_filtered();
|
||||
ctx->number_unselected_rows = state->num_rows_load_unselected();
|
||||
int64_t num_selected_rows = ctx->number_total_rows - ctx->number_unselected_rows;
|
||||
if (!ctx->group_commit && num_selected_rows > 0 &&
|
||||
(double)ctx->number_filtered_rows / num_selected_rows > ctx->max_filter_ratio) {
|
||||
// NOTE: Do not modify the error message here, for historical reasons,
|
||||
// some users may rely on this error message.
|
||||
*status = Status::DataQualityError("too many filtered rows");
|
||||
}
|
||||
if (ctx->number_filtered_rows > 0 && !state->get_error_log_file_path().empty()) {
|
||||
ctx->error_url = to_load_error_http_path(state->get_error_log_file_path());
|
||||
}
|
||||
|
||||
if (status->ok()) {
|
||||
ctx->number_total_rows = state->num_rows_load_total();
|
||||
ctx->number_loaded_rows = state->num_rows_load_success();
|
||||
ctx->number_filtered_rows = state->num_rows_load_filtered();
|
||||
ctx->number_unselected_rows = state->num_rows_load_unselected();
|
||||
|
||||
int64_t num_selected_rows = ctx->number_total_rows - ctx->number_unselected_rows;
|
||||
if (!ctx->group_commit && num_selected_rows > 0 &&
|
||||
(double)ctx->number_filtered_rows / num_selected_rows > ctx->max_filter_ratio) {
|
||||
// NOTE: Do not modify the error message here, for historical reasons,
|
||||
// some users may rely on this error message.
|
||||
*status = Status::DataQualityError("too many filtered rows");
|
||||
}
|
||||
if (ctx->number_filtered_rows > 0 && !state->get_error_log_file_path().empty()) {
|
||||
ctx->error_url = to_load_error_http_path(state->get_error_log_file_path());
|
||||
}
|
||||
|
||||
if (status->ok()) {
|
||||
DorisMetrics::instance()->stream_receive_bytes_total->increment(ctx->receive_bytes);
|
||||
DorisMetrics::instance()->stream_load_rows_total->increment(
|
||||
ctx->number_loaded_rows);
|
||||
}
|
||||
DorisMetrics::instance()->stream_receive_bytes_total->increment(ctx->receive_bytes);
|
||||
DorisMetrics::instance()->stream_load_rows_total->increment(ctx->number_loaded_rows);
|
||||
} else {
|
||||
if (ctx->group_commit) {
|
||||
ctx->number_total_rows = state->num_rows_load_total();
|
||||
ctx->number_loaded_rows = state->num_rows_load_success();
|
||||
ctx->number_filtered_rows = state->num_rows_load_filtered();
|
||||
ctx->number_unselected_rows = state->num_rows_load_unselected();
|
||||
if (ctx->number_filtered_rows > 0 && !state->get_error_log_file_path().empty()) {
|
||||
ctx->error_url = to_load_error_http_path(state->get_error_log_file_path());
|
||||
}
|
||||
}
|
||||
LOG(WARNING) << "fragment execute failed"
|
||||
<< ", err_msg=" << status->to_string() << ", " << ctx->brief();
|
||||
// cancel body_sink, make sender known it
|
||||
|
||||
@ -1662,6 +1662,12 @@ Status VTabletWriter::write(doris::vectorized::Block& input_block) {
|
||||
bool has_filtered_rows = false;
|
||||
int64_t filtered_rows = 0;
|
||||
_number_input_rows += rows;
|
||||
// update incrementally so that FE can get the progress.
|
||||
// the real 'num_rows_load_total' will be set when sink being closed.
|
||||
_state->update_num_rows_load_total(rows);
|
||||
_state->update_num_bytes_load_total(bytes);
|
||||
DorisMetrics::instance()->load_rows->increment(rows);
|
||||
DorisMetrics::instance()->load_bytes->increment(bytes);
|
||||
|
||||
_row_distribution_watch.start();
|
||||
RETURN_IF_ERROR(_row_distribution.generate_rows_distribution(
|
||||
@ -1674,12 +1680,6 @@ Status VTabletWriter::write(doris::vectorized::Block& input_block) {
|
||||
_generate_index_channels_payloads(_row_part_tablet_ids, channel_to_payload);
|
||||
_row_distribution_watch.stop();
|
||||
|
||||
// update incrementally so that FE can get the progress.
|
||||
// the real 'num_rows_load_total' will be set when sink being closed.
|
||||
_state->update_num_rows_load_total(rows);
|
||||
_state->update_num_bytes_load_total(bytes);
|
||||
DorisMetrics::instance()->load_rows->increment(rows);
|
||||
DorisMetrics::instance()->load_bytes->increment(bytes);
|
||||
// Random distribution and the block belongs to a single tablet, we could optimize to append the whole
|
||||
// block into node channel.
|
||||
bool load_block_to_single_tablet =
|
||||
|
||||
@ -0,0 +1,9 @@
|
||||
281747159,apple-store,2009-08-07,price_change,2.99,1.99,"","",2024-02-02T14:08:16+08:00
|
||||
281747159,apple-store,2009-08-13,version_change,1.1.2 (iPhone OS 3.0 Tested),1.1.1 (iPhone OS 3.0 Tested),"","",2024-02-02T14:08:16+08:00
|
||||
281790044,apple-store,2009-08-06,version_change,3.0 (iPhone OS 3.0 Tested),1.3.1 (iPhone OS 3.0 Tested),"","",2024-02-24T19:43:05+08:00
|
||||
281790044,apple-store,2009-08-17,version_change,3.0.1 (iPhone OS 3.0 Tested),3.0 (iPhone OS 3.0 Tested),"","",2024-02-24T19:43:05+08:00
|
||||
281796108,apple-store,2009-08-24,version_change,3.1.0 (iPhone OS 3.0 Tested),3.0.2 (iPhone OS 3.0 Tested),"","",2024-02-10T17:48:26+08:00
|
||||
281941097,apple-store,2009-08-15,version_change,2.6.0 (iPhone OS 3.0 Tested),2.5.0,"","",2024-02-17T11:15:40+08:00
|
||||
281941097,apple-store,2009-08-22,version_change,2.6.3 (iPhone OS 3.0 Tested),2.6.0 (iPhone OS 3.0 Tested),"","",2024-02-17T11:15:40+08:00
|
||||
282614216,apple-store,2009-08-12,version_change,1.4.0 (iPhone OS 3.0 Tested),1.3.0 (iPhone OS 3.0 Tested),"","",2024-02-21T21:57:58+08:00
|
||||
282738621,apple-store,2009-08-12,name_change,ZHI Chinese-English Dictionary,Chinese English Dictionary,"","",2024-02-17T14:20:44+08:00
|
||||
|
@ -151,8 +151,8 @@ suite("test_pipeline_load", "nonConcurrent") {
|
||||
log.info("Stream load result: ${result}".toString())
|
||||
def json = parseJson(result)
|
||||
assertEquals("fail", json.Status.toLowerCase())
|
||||
assertTrue(json.Message.contains("[DATA_QUALITY_ERROR]Encountered unqualified data"))
|
||||
assertEquals(0, json.NumberTotalRows)
|
||||
assertTrue(json.Message.contains("Encountered unqualified data"))
|
||||
assertEquals(100, json.NumberTotalRows)
|
||||
assertEquals(0, json.NumberFilteredRows)
|
||||
assertEquals(0, json.NumberUnselectedRows)
|
||||
}
|
||||
|
||||
@ -0,0 +1,76 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
suite("test_stream_load_error_url", "p0") {
|
||||
def tableName = "test_stream_load_error_url"
|
||||
try {
|
||||
sql """ DROP TABLE IF EXISTS ${tableName} """
|
||||
sql """
|
||||
CREATE TABLE IF NOT EXISTS ${tableName} (
|
||||
`product_id` BIGINT NULL,
|
||||
`market_code` VARCHAR(32),
|
||||
`date` DATE NULL,
|
||||
`event_type` VARCHAR(255) NULL,
|
||||
`new_value` TEXT NULL,
|
||||
`old_value` TEXT NULL,
|
||||
`release_note` TEXT NULL,
|
||||
`release_date` TEXT NULL,
|
||||
`u_time` DATETIME NULL
|
||||
) ENGINE=OLAP
|
||||
UNIQUE KEY(`product_id`, `market_code`, `date`, `event_type`)
|
||||
COMMENT 'test_error_url'
|
||||
PARTITION BY RANGE(`date`)
|
||||
(PARTITION p_201001 VALUES [('2010-01-01'), ('2012-01-01')),
|
||||
PARTITION p_201201 VALUES [('2012-01-01'), ('2014-01-01')),
|
||||
PARTITION p_201401 VALUES [('2014-01-01'), ('2016-01-01')),
|
||||
PARTITION p_201601 VALUES [('2016-01-01'), ('2018-01-01')),
|
||||
PARTITION p_201801 VALUES [('2018-01-01'), ('2020-01-01')),
|
||||
PARTITION p_202001 VALUES [('2020-01-01'), ('2022-01-01')),
|
||||
PARTITION p_202201 VALUES [('2022-01-01'), ('2024-01-01')),
|
||||
PARTITION p_202401 VALUES [('2024-01-01'), ('2026-01-01')),
|
||||
PARTITION p_202601 VALUES [('2026-01-01'), ('2028-01-01')),
|
||||
PARTITION p_202801 VALUES [('2028-01-01'), ('2028-12-01')))
|
||||
DISTRIBUTED BY HASH(`product_id`, `market_code`, `date`, `event_type`) BUCKETS 10
|
||||
PROPERTIES (
|
||||
"replication_allocation" = "tag.location.default: 1"
|
||||
);
|
||||
"""
|
||||
|
||||
streamLoad {
|
||||
table "${tableName}"
|
||||
set 'column_separator', ','
|
||||
set 'columns', 'k1, k2, k3'
|
||||
file 'test_error_url.csv'
|
||||
|
||||
check { result, exception, startTime, endTime ->
|
||||
if (exception != null) {
|
||||
throw exception
|
||||
}
|
||||
log.info("Stream load result: ${result}".toString())
|
||||
def json = parseJson(result)
|
||||
assertEquals("fail", json.Status.toLowerCase())
|
||||
assertTrue(json.Message.contains("[DATA_QUALITY_ERROR]too many filtered rows"))
|
||||
def (code, out, err) = curl("GET", json.ErrorURL)
|
||||
log.info("error result: " + out)
|
||||
assertTrue(out.contains("actual column number in csv file is more than schema column number.actual number"))
|
||||
log.info("url: " + json.ErrorURL)
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
sql """ DROP TABLE IF EXISTS ${tableName} """
|
||||
}
|
||||
}
|
||||
@ -440,7 +440,7 @@ suite("test_partial_update_schema_change", "p0") {
|
||||
log.info("Stream load result: ${result}".toString())
|
||||
def json = parseJson(result)
|
||||
assertEquals("fail", json.Status.toLowerCase())
|
||||
assertEquals(0, json.NumberTotalRows)
|
||||
assertEquals(1, json.NumberTotalRows)
|
||||
assertEquals(0, json.NumberFilteredRows)
|
||||
assertEquals(0, json.NumberUnselectedRows)
|
||||
}
|
||||
@ -1033,7 +1033,7 @@ suite("test_partial_update_schema_change", "p0") {
|
||||
log.info("Stream load result: ${result}".toString())
|
||||
def json = parseJson(result)
|
||||
assertEquals("fail", json.Status.toLowerCase())
|
||||
assertEquals(0, json.NumberTotalRows)
|
||||
assertEquals(1, json.NumberTotalRows)
|
||||
assertEquals(0, json.NumberFilteredRows)
|
||||
assertEquals(0, json.NumberUnselectedRows)
|
||||
}
|
||||
|
||||
@ -444,7 +444,7 @@ suite("test_partial_update_row_store_schema_change", "p0") {
|
||||
log.info("Stream load result: ${result}".toString())
|
||||
def json = parseJson(result)
|
||||
assertEquals("fail", json.Status.toLowerCase())
|
||||
assertEquals(0, json.NumberTotalRows)
|
||||
assertEquals(1, json.NumberTotalRows)
|
||||
assertEquals(0, json.NumberFilteredRows)
|
||||
assertEquals(0, json.NumberUnselectedRows)
|
||||
}
|
||||
@ -1043,7 +1043,7 @@ suite("test_partial_update_row_store_schema_change", "p0") {
|
||||
log.info("Stream load result: ${result}".toString())
|
||||
def json = parseJson(result)
|
||||
assertEquals("fail", json.Status.toLowerCase())
|
||||
assertEquals(0, json.NumberTotalRows)
|
||||
assertEquals(1, json.NumberTotalRows)
|
||||
assertEquals(0, json.NumberFilteredRows)
|
||||
assertEquals(0, json.NumberUnselectedRows)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user