branch-2.1: [fix](csv reader) fix data loss when concurrency read using multi char line delimiter (#53374) (#53635)

pick (#53374)

Multiple concurrent split file locations will be determined in plan
phase, if the split point happens to be in the middle of the multi char
line delimiter:

- The previous concurrent will read the complete row1 and read a little
more to read the line delimiter.
- The latter concurrency will start reading from half of the multi char
line delimiter, and row2 is the first line of this concurrency, but the
first line in the middle range is always discarded, so row2 will be
lost.

### What problem does this PR solve?

Issue Number: close #xxx

Related PR: #xxx

Problem Summary:

### Release note

None

### Check List (For Author)

- Test <!-- At least one of them must be included. -->
    - [ ] Regression test
    - [ ] Unit Test
    - [ ] Manual test (add detailed scripts or steps below)
    - [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
        - [ ] Previous test can cover this change.
        - [ ] No code files have been changed.
        - [ ] Other reason <!-- Add your reason?  -->

- Behavior changed:
    - [ ] No.
    - [ ] Yes. <!-- Explain the behavior change -->

- Does this need documentation?
    - [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->

### Check List (For Reviewer who merge this PR)

- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
This commit is contained in:
hui lai
2025-07-27 07:57:26 +08:00
committed by GitHub
parent 8869783458
commit 372627231c
3 changed files with 159 additions and 2 deletions

View File

@ -302,8 +302,12 @@ Status CsvReader::init_reader(bool is_load) {
_file_compress_type != TFileCompressType::PLAIN)) {
return Status::InternalError<false>("For now we do not support split compressed file");
}
start_offset -= 1;
_size += 1;
// pre-read to promise first line skipped always read
int64_t pre_read_len = std::min(
static_cast<int64_t>(_params.file_attributes.text_params.line_delimiter.size()),
start_offset);
start_offset -= pre_read_len;
_size += pre_read_len;
// not first range will always skip one line
_skip_lines = 1;
}

View File

@ -0,0 +1,76 @@
CREATE TABLE `test_multi_char_line_delimiter` (
`col1` bigint NULL,
`col2` bigint NULL,
`col3` varchar(765) NULL,
`col4` varchar(765) NULL,
`col5` bigint NULL,
`col6` bigint NULL,
`col7` int NULL,
`col8` int NULL,
`col9` tinyint NULL,
`col10` bigint NULL,
`col11` datetime NULL,
`col12` bigint NULL,
`col13` bigint NULL,
`col14` int NULL,
`col15` bigint MIN NULL,
`col16` decimal(19,4) SUM NULL,
`col17` decimal(19,4) SUM NULL,
`col18` decimal(19,4) SUM NULL,
`col19` decimal(19,4) SUM NULL,
`col20` decimal(19,4) SUM NULL,
`col21` decimal(19,4) SUM NULL,
`col22` decimal(19,4) SUM NULL,
`col23` decimal(19,4) SUM NULL,
`col24` decimal(19,4) SUM NULL,
`col25` decimal(19,4) SUM NULL,
`col26` decimal(19,4) SUM NULL,
`col27` decimal(19,4) SUM NULL,
`col28` decimal(19,4) SUM NULL,
`col29` decimal(19,4) SUM NULL,
`col30` decimal(19,4) SUM NULL,
`col31` decimal(19,4) SUM NULL,
`col32` decimal(19,4) SUM NULL,
`col33` decimal(19,4) SUM NULL DEFAULT "0",
`col34` decimal(19,4) SUM NULL DEFAULT "0",
`col35` decimal(19,4) SUM NULL DEFAULT "0",
`col36` decimal(19,4) SUM NULL DEFAULT "0",
`col37` decimal(19,4) SUM NULL DEFAULT "0",
`col38` decimal(19,4) SUM NULL DEFAULT "0",
`col39` decimal(19,4) SUM NULL DEFAULT "0",
`col40` decimal(19,4) SUM NULL DEFAULT "0",
`col41` decimal(19,4) SUM NULL DEFAULT "0",
`col42` decimal(19,4) SUM NULL DEFAULT "0",
`col43` decimal(19,4) SUM NULL DEFAULT "0",
`col44` decimal(19,4) SUM NULL DEFAULT "0",
`col45` decimal(19,4) SUM NULL DEFAULT "0",
`col46` decimal(19,4) SUM NULL DEFAULT "0",
`col47` decimal(19,4) SUM NULL DEFAULT "0",
`col48` decimal(19,4) SUM NULL DEFAULT "0",
`col49` decimal(19,4) SUM NULL DEFAULT "0",
`col50` decimal(19,4) SUM NULL DEFAULT "0",
`col51` decimal(19,4) SUM NULL,
`col52` datetime MIN NULL,
`col53` bigint MIN NULL,
`col54` datetime MAX NULL,
`col55` bigint MAX NULL,
`col56` tinyint MIN NULL,
`col57` bitmap BITMAP_UNION NOT NULL DEFAULT BITMAP_EMPTY
) ENGINE=OLAP
AGGREGATE KEY(`col1`, `col2`, `col3`, `col4`, `col5`, `col6`, `col7`, `col8`, `col9`, `col10`, `col11`, `col12`, `col13`, `col14`)
PARTITION BY RANGE(`col12`, `col11`)
(PARTITION p_default VALUES [("0", '1900-01-01 00:00:00'), ("99999", '2030-01-01 00:00:00')))
DISTRIBUTED BY HASH(`col8`) BUCKETS 1
PROPERTIES (
"file_cache_ttl_seconds" = "0",
"is_being_synced" = "false",
"storage_medium" = "hdd",
"storage_format" = "V2",
"inverted_index_storage_format" = "V2",
"light_schema_change" = "true",
"disable_auto_compaction" = "false",
"enable_single_replica_compaction" = "false",
"group_commit_interval_ms" = "10000",
"group_commit_data_bytes" = "134217728",
"replication_allocation" = "tag.location.default: 1"
);

View File

@ -0,0 +1,77 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
suite("test_multi_char_line_delimiter", "p2") {
def s3BucketName = getS3BucketName()
def s3Endpoint = getS3Endpoint()
def s3Region = getS3Region()
def ak = getS3AK()
def sk = getS3SK()
def tableName = "test_multi_char_line_delimiter"
def label = "test_multi_char_line_delimiter"
sql """ DROP TABLE IF EXISTS ${tableName} """
sql new File("""${context.file.parent}/ddl/${tableName}.sql""").text
sql """
LOAD LABEL ${label}
(
DATA INFILE("s3://${s3BucketName}/regression/load/data/test_multi_char_line_delimiter*.csv")
INTO TABLE ${tableName}
COLUMNS TERMINATED BY "\tcolumn_separator"
LINES TERMINATED BY "\nline_delimiter"
FORMAT AS CSV
(`col1`,`col2`,`col3`,`col4`,`col5`,`col6`,`col7`,`col8`,`col9`,`col10`,`col11`,`col12`,`col13`,`col14`,`col15`,`col16`,`col17`,`col18`,`col19`,`col20`,`col21`,`col22`,`col23`,`col24`,`col25`,`col26`,`col27`,`col28`,`col29`,`col30`,`col31`,`col32`,`col33`,`col34`,`col35`,`col36`,`col37`,`col38`,`col39`,`col40`,`col41`,`col42`,`col43`,`col44`,`col45`,`col46`,`col47`,`col48`,`col49`,`col50`,`col51`,`col52`,`col53`,`col54`,`col55`,`col56`,`col57`)
SET(`col1`=`col1`,`col2`=`col2`,`col3`=`col3`,`col4`=`col4`,`col5`=`col5`,`col6`=`col6`,`col7`=`col7`,`col8`=`col8`,`col9`=`col9`,`col10`=`col10`,`col11`=`col11`,`col12`=`col12`,`col13`=`col13`,`col14`=`col14`,`col15`=`col15`,`col16`=`col16`,`col17`=`col17`,`col18`=`col18`,`col19`=`col19`,`col20`=`col20`,`col21`=`col21`,`col22`=`col22`,`col23`=`col23`,`col24`=`col24`,`col25`=`col25`,`col26`=`col26`,`col27`=`col27`,`col28`=`col28`,`col29`=`col29`,`col30`=`col30`,`col31`=`col31`,`col32`=`col32`,`col33`=`col33`,`col34`=`col34`,`col35`=`col35`,`col36`=`col36`,`col37`=`col37`,`col38`=`col38`,`col39`=`col39`,`col40`=`col40`,`col41`=`col41`,`col42`=`col42`,`col43`=`col43`,`col44`=`col44`,`col45`=`col45`,`col46`=`col46`,`col47`=`col47`,`col48`=`col48`,`col49`=`col49`,`col50`=`col50`,`col51`=`col51`,`col52`=`col52`,`col53`=`col53`,`col54`=`col54`,`col55`=`col55`,`col56`=`col56`,col57=bitmap_from_string(col57))
)
WITH S3
(
"s3.region" = "${s3Region}",
"s3.endpoint" = "${s3Endpoint}",
"s3.access_key" = "${ak}",
"s3.secret_key" = "${sk}"
)
PROPERTIES
(
"timeout" = "3600",
"load_parallelism" = "4"
);
"""
def max_try_milli_secs = 600000
while (max_try_milli_secs > 0) {
def String[][] result = sql """ show load where label="$label"; """
logger.info("Load status: " + result[0][2] + ", label: $label")
if (result[0][2].equals("FINISHED")) {
logger.info("Load FINISHED " + label)
break;
}
if (result[0][2].equals("CANCELLED")) {
assertTrue(false, "load failed: $result")
break;
}
Thread.sleep(1000)
max_try_milli_secs -= 1000
if(max_try_milli_secs <= 0) {
assertTrue(1 == 2, "load Timeout: $label")
}
}
def result = sql """ select count(*) from ${tableName}; """
logger.info("result: ${result[0][0]}")
assertTrue(result[0][0] == 2060625, "load result is not correct")
sql """ DROP TABLE IF EXISTS ${tableName} """
}