[bug](random distribution) fix data loss and incorrect in random distribution table #33962

This commit is contained in:
xy720
2024-04-23 16:59:58 +08:00
committed by yiguolei
parent 799c43686c
commit 080c07ad87
4 changed files with 4196 additions and 0 deletions

View File

@ -518,6 +518,31 @@ Status VNodeChannel::add_block(vectorized::Block* block, const Payload* payload,
SCOPED_RAW_TIMER(&_stat.append_node_channel_ns);
if (is_append) {
if (_cur_mutable_block && !_cur_mutable_block->empty()) {
// When is-append is true, the previous block may not have been sent out yet.
// (e.x. The previous block is not load to single tablet, and its row num was
// 4064, which is smaller than the send batch size 8192).
// If we clear the previous block directly here, it will cause data loss.
{
SCOPED_ATOMIC_TIMER(&_queue_push_lock_ns);
std::lock_guard<std::mutex> l(_pending_batches_lock);
// To simplify the add_row logic, postpone adding block into req until the time of sending req
_pending_batches_bytes += _cur_mutable_block->allocated_bytes();
_cur_add_block_request->set_eos(
false); // for multi-add, only when marking close we set it eos.
// Copy the request to tmp request to add to pend block queue
auto tmp_add_block_request = std::make_shared<PTabletWriterAddBlockRequest>();
*tmp_add_block_request = *_cur_add_block_request;
_pending_blocks.emplace(std::move(_cur_mutable_block), tmp_add_block_request);
_pending_batches_num++;
VLOG_DEBUG << "VTabletWriter:" << _parent << " VNodeChannel:" << this
<< " pending_batches_bytes:" << _pending_batches_bytes
<< " jobid:" << std::to_string(_state->load_job_id())
<< " loadinfo:" << _load_info;
}
_cur_mutable_block = vectorized::MutableBlock::create_unique(block->clone_empty());
_cur_add_block_request->clear_tablet_ids();
}
// Do not split the data of the block by tablets but append it to a single delta writer.
// This is a faster way to send block than append_to_block_by_selector
// TODO: we could write to local delta writer if single_replica_load is true
@ -537,6 +562,8 @@ Status VNodeChannel::add_block(vectorized::Block* block, const Payload* payload,
for (auto tablet_id : payload->second) {
_cur_add_block_request->add_tablet_ids(tablet_id);
}
// need to reset to false avoid load data to incorrect tablet.
_cur_add_block_request->set_is_single_tablet_block(false);
}
if (is_append || _cur_mutable_block->rows() >= _batch_size ||

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,11 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql1 --
4100
-- !sql2 --
2024-03-27 74
2024-03-28 81
2024-04-01 837
2024-04-04 1568
2024-04-05 1540

View File

@ -0,0 +1,58 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
suite("test_load_block_to_single_tablet", "p0") {
sql "show tables"
def tableName = "test_load_block_to_single_tablet"
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE `${tableName}` (
`ddate` date NULL,
`dhour` varchar(*) NULL,
`server_time` datetime NULL,
`log_type` varchar(*) NULL,
`source_flag` varchar(*) NULL,
`a` varchar(*) NULL,
`ext_json` varchar(*) NULL
) ENGINE=OLAP
DUPLICATE KEY(`ddate`)
PARTITION BY RANGE(`ddate`)
(PARTITION p202403 VALUES [('2024-03-01'), ('2024-04-01')),
PARTITION p202404 VALUES [('2024-04-01'), ('2024-05-01')))
DISTRIBUTED BY RANDOM BUCKETS 16
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
"""
streamLoad {
table "${tableName}"
set 'column_separator', '\t'
set 'columns', 'ddate, dhour, server_time, log_type, source_flag, a, ext_json'
set 'partitions', 'p202403, p202404'
file 'test_load_block_to_single_tablet.csv'
time 10000 // limit inflight 10s
}
sql "sync"
qt_sql1 "select count(*) from ${tableName};"
qt_sql2 "select ddate,count(1) from ${tableName} group by ddate order by ddate;"
}