handle create rowset error to avoid null pointer exception (#30670)

This commit is contained in:
HHoflittlefish777
2024-02-01 11:51:51 +08:00
committed by GitHub
parent 92cad69fc4
commit 4f1d76d646
3 changed files with 77 additions and 9 deletions

View File

@ -173,8 +173,9 @@ Status BetaRowsetWriter::_generate_delete_bitmap(int32_t segment_id) {
(_context.partial_update_info && _context.partial_update_info->is_partial_update)) {
return Status::OK();
}
auto rowset = _build_tmp();
auto beta_rowset = reinterpret_cast<BetaRowset*>(rowset.get());
RowsetSharedPtr rowset_ptr;
RETURN_IF_ERROR(_build_tmp(rowset_ptr));
auto beta_rowset = reinterpret_cast<BetaRowset*>(rowset_ptr.get());
std::vector<segment_v2::SegmentSharedPtr> segments;
RETURN_IF_ERROR(beta_rowset->load_segments(segment_id, segment_id + 1, &segments));
std::vector<RowsetSharedPtr> specified_rowsets;
@ -184,7 +185,7 @@ Status BetaRowsetWriter::_generate_delete_bitmap(int32_t segment_id) {
specified_rowsets = tablet->get_rowset_by_ids(&_context.mow_context->rowset_ids);
}
OlapStopWatch watch;
RETURN_IF_ERROR(tablet->calc_delete_bitmap(rowset, segments, specified_rowsets,
RETURN_IF_ERROR(tablet->calc_delete_bitmap(rowset_ptr, segments, specified_rowsets,
_context.mow_context->delete_bitmap,
_context.mow_context->max_version, nullptr));
size_t total_rows = std::accumulate(
@ -657,19 +658,20 @@ void BaseBetaRowsetWriter::_build_rowset_meta(std::shared_ptr<RowsetMeta> rowset
}
}
RowsetSharedPtr BaseBetaRowsetWriter::_build_tmp() {
Status BaseBetaRowsetWriter::_build_tmp(RowsetSharedPtr& rowset_ptr) {
std::shared_ptr<RowsetMeta> rowset_meta_ = std::make_shared<RowsetMeta>();
rowset_meta_->init(_rowset_meta.get());
_build_rowset_meta(rowset_meta_);
RowsetSharedPtr rowset;
auto status = RowsetFactory::create_rowset(_context.tablet_schema, _context.rowset_dir,
rowset_meta_, &rowset);
rowset_meta_, &rowset_ptr);
DBUG_EXECUTE_IF("BaseBetaRowsetWriter::_build_tmp.create_rowset_failed",
{ status = Status::InternalError("create rowset failed"); });
if (!status.ok()) {
LOG(WARNING) << "rowset init failed when build new rowset, res=" << status;
return nullptr;
return status;
}
return rowset;
return Status::OK();
}
Status BaseBetaRowsetWriter::_create_file_writer(std::string path, io::FileWriterPtr& file_writer) {

View File

@ -141,7 +141,7 @@ protected:
virtual Status _check_segment_number_limit();
virtual int64_t _num_seg() const;
// build a tmp rowset for load segment to calc delete_bitmap for this segment
RowsetSharedPtr _build_tmp();
Status _build_tmp(RowsetSharedPtr& rowset_ptr);
RowsetWriterContext _context;
std::shared_ptr<RowsetMeta> _rowset_meta;

View File

@ -0,0 +1,66 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
suite("test_rowset_writer_fault", "nonConcurrent") {
sql """ DROP TABLE IF EXISTS `baseall` """
sql """
CREATE TABLE IF NOT EXISTS `baseall` (
`k0` boolean null comment "",
`k1` tinyint(4) null comment "",
`k2` smallint(6) null comment "",
`k3` int(11) null comment "",
`k4` bigint(20) null comment "",
`k5` decimal(9, 3) null comment "",
`k6` char(5) null comment "",
`k10` date null comment "",
`k11` datetime null comment "",
`k7` varchar(20) null comment "",
`k8` double null comment "",
`k9` float null comment "",
`k12` string null comment "",
`k13` largeint(40) null comment ""
) engine=olap
UNIQUE KEY (k0)
DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1")
"""
GetDebugPoint().clearDebugPointsForAllBEs()
def injection = "BaseBetaRowsetWriter::_build_tmp.create_rowset_failed"
try {
GetDebugPoint().enableDebugPointForAllBEs(injection)
streamLoad {
table "baseall"
db "regression_test_fault_injection_p0"
set 'column_separator', ','
file "baseall.txt"
check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
assertEquals("fail", json.Status.toLowerCase())
}
}
} catch(Exception e) {
logger.info(e.getMessage())
assertTrue(e.getMessage().contains(error_msg))
} finally {
GetDebugPoint().disableDebugPointForAllBEs(injection)
}
sql """ DROP TABLE IF EXISTS `baseall` """
}