diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index 856a9af261..6b0cfb3e10 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -322,7 +322,7 @@ Status DeltaWriter::close() { // if this delta writer is not initialized, but close() is called. // which means this tablet has no data loaded, but at least one tablet // in same partition has data loaded. - // so we have to also init this DeltaWriter, so that it can create a empty rowset + // so we have to also init this DeltaWriter, so that it can create an empty rowset // for this tablet when being closed. RETURN_NOT_OK(init()); } @@ -413,7 +413,7 @@ Status DeltaWriter::cancel() { Status DeltaWriter::cancel_with_status(const Status& st) { std::lock_guard l(_lock); - if (!_is_init || _is_cancelled) { + if (_is_cancelled) { return Status::OK(); } _mem_table.reset(); diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp index 0624bcb94a..8096bd1ba8 100644 --- a/be/src/runtime/tablets_channel.cpp +++ b/be/src/runtime/tablets_channel.cpp @@ -121,6 +121,14 @@ Status TabletsChannel::close( // just skip this tablet(writer) and continue to close others continue; } + // to make sure tablet writer in `_broken_tablets` won't call `close_wait` method. + // `close_wait` might create the rowset and commit txn directly, and the subsequent + // publish version task will success, which can cause the replica inconsistency. + if (_broken_tablets.find(it.second->tablet_id()) != _broken_tablets.end()) { + LOG(WARNING) << "SHOULD NOT HAPPEN, tablet writer is broken but not cancelled" + << ", tablet_id=" << it.first << ", transaction_id=" << _txn_id; + continue; + } need_wait_writers.insert(it.second); } else { auto st = it.second->cancel(); @@ -182,11 +190,9 @@ void TabletsChannel::_close_wait(DeltaWriter* writer, const bool write_single_replica) { Status st = writer->close_wait(slave_tablet_nodes, write_single_replica); if (st.ok()) { - if (_broken_tablets.find(writer->tablet_id()) == _broken_tablets.end()) { - PTabletInfo* tablet_info = tablet_vec->Add(); - tablet_info->set_tablet_id(writer->tablet_id()); - tablet_info->set_schema_hash(writer->schema_hash()); - } + PTabletInfo* tablet_info = tablet_vec->Add(); + tablet_info->set_tablet_id(writer->tablet_id()); + tablet_info->set_schema_hash(writer->schema_hash()); } else { PTabletError* tablet_error = tablet_errors->Add(); tablet_error->set_tablet_id(writer->tablet_id()); @@ -484,6 +490,7 @@ Status TabletsChannel::add_batch(const TabletWriterAddRequest& request, PTabletError* error = tablet_errors->Add(); error->set_tablet_id(tablet_to_rowidxs_it.first); error->set_msg(err_msg); + tablet_writer_it->second->cancel_with_status(st); _broken_tablets.insert(tablet_to_rowidxs_it.first); // continue write to other tablet. // the error will return back to sender.