cherry-pick #35576
This commit is contained in:
@ -37,7 +37,9 @@ std::shared_ptr<DeltaWriterV2> DeltaWriterV2Map::get_or_create(
|
||||
return _map.at(tablet_id);
|
||||
}
|
||||
std::shared_ptr<DeltaWriterV2> writer = creator();
|
||||
_map[tablet_id] = writer;
|
||||
if (writer != nullptr) {
|
||||
_map[tablet_id] = writer;
|
||||
}
|
||||
return writer;
|
||||
}
|
||||
|
||||
|
||||
@ -417,17 +417,21 @@ Status VTabletWriterV2::write(Block& input_block) {
|
||||
|
||||
// For each tablet, send its input_rows from block to delta writer
|
||||
for (const auto& [tablet_id, rows] : rows_for_tablet) {
|
||||
Streams streams;
|
||||
RETURN_IF_ERROR(_select_streams(tablet_id, rows.partition_id, rows.index_id, streams));
|
||||
RETURN_IF_ERROR(_write_memtable(block, tablet_id, rows, streams));
|
||||
RETURN_IF_ERROR(_write_memtable(block, tablet_id, rows));
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status VTabletWriterV2::_write_memtable(std::shared_ptr<vectorized::Block> block, int64_t tablet_id,
|
||||
const Rows& rows, const Streams& streams) {
|
||||
const Rows& rows) {
|
||||
auto delta_writer = _delta_writer_for_tablet->get_or_create(tablet_id, [&]() {
|
||||
Streams streams;
|
||||
auto st = _select_streams(tablet_id, rows.partition_id, rows.index_id, streams);
|
||||
if (!st.ok()) [[unlikely]] {
|
||||
LOG(WARNING) << st << ", load_id=" << print_id(_load_id);
|
||||
return std::unique_ptr<DeltaWriterV2>(nullptr);
|
||||
}
|
||||
WriteRequest req {
|
||||
.tablet_id = tablet_id,
|
||||
.txn_id = _txn_id,
|
||||
|
||||
@ -140,7 +140,7 @@ private:
|
||||
RowsForTablet& rows_for_tablet);
|
||||
|
||||
Status _write_memtable(std::shared_ptr<vectorized::Block> block, int64_t tablet_id,
|
||||
const Rows& rows, const Streams& streams);
|
||||
const Rows& rows);
|
||||
|
||||
Status _select_streams(int64_t tablet_id, int64_t partition_id, int64_t index_id,
|
||||
Streams& streams);
|
||||
|
||||
@ -88,11 +88,11 @@ suite("test_writer_v2_fault_injection", "nonConcurrent") {
|
||||
// VTabletWriterV2 tablet_location is null
|
||||
load_with_injection("VTabletWriterV2._build_tablet_node_mapping.tablet_location_null", "unknown tablet location")
|
||||
// VTabletWriterV2 location is null
|
||||
load_with_injection("VTabletWriterV2._select_streams.location_null", "unknown tablet location")
|
||||
load_with_injection("VTabletWriterV2._select_streams.location_null", "failed to open DeltaWriter for tablet")
|
||||
// VTabletWriterV2 cancel
|
||||
load_with_injection("VTabletWriterV2.close.cancel", "load cancel")
|
||||
// DeltaWriterV2 stream_size is 0
|
||||
load_with_injection("DeltaWriterV2.init.stream_size", "failed to find tablet schema")
|
||||
|
||||
sql """ set enable_memtable_on_sink_node=false """
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user