[fix](load) delta writer init failed might cause data inconsistency between multiple replicas (#15058)

In the following case, data inconsistency would happen between multiple replicas

current delta writer only writes a few lines of data (which meas the write() method only called once)
writer failed when init()(which is called at the fist time we call write()), and current tablet is recorded in _broken_tablets
delta writer closed, and in the close() method, delta writer found it's not inited, treat such case as an empty load, it will try to init again, which would create an empty rowset.
tablet sink received the error report in rpc response, marked the replica as failed, but since the quorum replicas are succeed, so the following load commit operation will succeed.
FE send publish version task to each be, the one with empty rowset will publish version successfully.
We got 2 replica with data and 1 empty replica.
This commit is contained in:
zhannngchen
2022-12-16 22:07:00 +08:00
committed by GitHub
parent 6d5251af78
commit 0cd791ec57
2 changed files with 14 additions and 7 deletions

View File

@ -322,7 +322,7 @@ Status DeltaWriter::close() {
// if this delta writer is not initialized, but close() is called.
// which means this tablet has no data loaded, but at least one tablet
// in same partition has data loaded.
// so we have to also init this DeltaWriter, so that it can create a empty rowset
// so we have to also init this DeltaWriter, so that it can create an empty rowset
// for this tablet when being closed.
RETURN_NOT_OK(init());
}
@ -413,7 +413,7 @@ Status DeltaWriter::cancel() {
Status DeltaWriter::cancel_with_status(const Status& st) {
std::lock_guard<std::mutex> l(_lock);
if (!_is_init || _is_cancelled) {
if (_is_cancelled) {
return Status::OK();
}
_mem_table.reset();

View File

@ -121,6 +121,14 @@ Status TabletsChannel::close(
// just skip this tablet(writer) and continue to close others
continue;
}
// to make sure tablet writer in `_broken_tablets` won't call `close_wait` method.
// `close_wait` might create the rowset and commit txn directly, and the subsequent
// publish version task will success, which can cause the replica inconsistency.
if (_broken_tablets.find(it.second->tablet_id()) != _broken_tablets.end()) {
LOG(WARNING) << "SHOULD NOT HAPPEN, tablet writer is broken but not cancelled"
<< ", tablet_id=" << it.first << ", transaction_id=" << _txn_id;
continue;
}
need_wait_writers.insert(it.second);
} else {
auto st = it.second->cancel();
@ -182,11 +190,9 @@ void TabletsChannel::_close_wait(DeltaWriter* writer,
const bool write_single_replica) {
Status st = writer->close_wait(slave_tablet_nodes, write_single_replica);
if (st.ok()) {
if (_broken_tablets.find(writer->tablet_id()) == _broken_tablets.end()) {
PTabletInfo* tablet_info = tablet_vec->Add();
tablet_info->set_tablet_id(writer->tablet_id());
tablet_info->set_schema_hash(writer->schema_hash());
}
PTabletInfo* tablet_info = tablet_vec->Add();
tablet_info->set_tablet_id(writer->tablet_id());
tablet_info->set_schema_hash(writer->schema_hash());
} else {
PTabletError* tablet_error = tablet_errors->Add();
tablet_error->set_tablet_id(writer->tablet_id());
@ -484,6 +490,7 @@ Status TabletsChannel::add_batch(const TabletWriterAddRequest& request,
PTabletError* error = tablet_errors->Add();
error->set_tablet_id(tablet_to_rowidxs_it.first);
error->set_msg(err_msg);
tablet_writer_it->second->cancel_with_status(st);
_broken_tablets.insert(tablet_to_rowidxs_it.first);
// continue write to other tablet.
// the error will return back to sender.