[fix](txn_manager) Add ingested rowsets to unused rowsets when removing txn (#37417)

Generally speaking, as long as a rowset has a version, it can be
considered not to be in a pending state. However, if the rowset was
created through ingesting binlogs, it will have a version but should
still be considered in a pending state because the ingesting txn has not
yet been committed.

This PR updates the condition for determining the pending state. If a
rowset is COMMITTED, the txn should be allowed to roll back even if a
version exists.

Cherry-pick #36551
This commit is contained in:
walter
2024-07-10 14:25:44 +08:00
committed by GitHub
parent f65d1c4011
commit afcc6170f6
6 changed files with 82 additions and 6 deletions

View File

@ -30,7 +30,16 @@ static bvar::Adder<size_t> g_total_rowset_num("doris_total_rowset_num");
Rowset::Rowset(const TabletSchemaSPtr& schema, const RowsetMetaSharedPtr& rowset_meta)
: _rowset_meta(rowset_meta), _refs_by_reader(0) {
_is_pending = !_rowset_meta->has_version();
_is_pending = true;
// Generally speaking, as long as a rowset has a version, it can be considered not to be in a pending state.
// However, if the rowset was created through ingesting binlogs, it will have a version but should still be
// considered in a pending state because the ingesting txn has not yet been committed.
if (_rowset_meta->has_version() && _rowset_meta->start_version() > 0 &&
_rowset_meta->rowset_state() != COMMITTED) {
_is_pending = false;
}
if (_is_pending) {
_is_cumulative = false;
} else {

View File

@ -163,6 +163,7 @@ public:
int64_t newest_write_timestamp() const { return rowset_meta()->newest_write_timestamp(); }
bool is_segments_overlapping() const { return rowset_meta()->is_segments_overlapping(); }
KeysType keys_type() { return _schema->keys_type(); }
RowsetStatePB rowset_meta_state() const { return rowset_meta()->rowset_state(); }
// remove all files in this rowset
// TODO should we rename the method to remove_files() to be more specific?

View File

@ -710,8 +710,14 @@ void StorageEngine::clear_transaction_task(const TTransactionId transaction_id,
<< ", tablet_uid=" << tablet_info.first.tablet_uid;
continue;
}
static_cast<void>(StorageEngine::instance()->txn_manager()->delete_txn(
partition_id, tablet, transaction_id));
Status s = StorageEngine::instance()->txn_manager()->delete_txn(partition_id, tablet,
transaction_id);
if (!s.ok()) {
LOG(WARNING) << "failed to clear transaction. txn_id=" << transaction_id
<< ", partition_id=" << partition_id
<< ", tablet_id=" << tablet_info.first.tablet_id
<< ", status=" << s.to_string();
}
}
}
LOG(INFO) << "finish to clear transaction task. transaction_id=" << transaction_id;

View File

@ -604,13 +604,14 @@ Status TxnManager::delete_txn(OlapMeta* meta, TPartitionId partition_id,
auto& load_info = load_itr->second;
auto& rowset = load_info->rowset;
if (rowset != nullptr && meta != nullptr) {
if (rowset->version().first > 0) {
if (!rowset->is_pending()) {
return Status::Error<TRANSACTION_ALREADY_COMMITTED>(
"could not delete transaction from engine, just remove it from memory not "
"delete from disk, because related rowset already published. partition_id: "
"{}, transaction_id: {}, tablet: {}, rowset id: {}, version:{}",
"{}, transaction_id: {}, tablet: {}, rowset id: {}, version: {}, state: {}",
key.first, key.second, tablet_info.to_string(),
rowset->rowset_id().to_string(), rowset->version().to_string());
rowset->rowset_id().to_string(), rowset->version().to_string(),
RowsetStatePB_Name(rowset->rowset_meta_state()));
} else {
static_cast<void>(RowsetMetaManager::remove(meta, tablet_uid, rowset->rowset_id()));
#ifndef BE_TEST

View File

@ -0,0 +1,22 @@
{
"rowset_id": 10002,
"partition_id": 10001,
"tablet_id": 12046,
"tablet_schema_hash": 365187263,
"rowset_type": "BETA_ROWSET",
"rowset_state": "COMMITTED",
"start_version": 0,
"end_version": 1,
"num_rows": 0,
"total_disk_size": 0,
"data_disk_size": 0,
"index_disk_size": 0,
"empty": true,
"creation_time": 1552911435,
"tablet_uid": {
"hi": 10,
"lo": 10
},
"num_segments": 1,
"has_variant_type_in_schema": false
}

View File

@ -54,6 +54,7 @@ static StorageEngine* k_engine = nullptr;
const std::string rowset_meta_path = "./be/test/olap/test_data/rowset_meta.json";
const std::string rowset_meta_path_2 = "./be/test/olap/test_data/rowset_meta2.json";
const std::string rowset_meta_path_3 = "./be/test/olap/test_data/rowset_meta3.json";
class TxnManagerTest : public testing::Test {
public:
@ -169,6 +170,22 @@ public:
EXPECT_EQ(rowset_meta2->rowset_id(), rowset_id);
EXPECT_EQ(Status::OK(), RowsetFactory::create_rowset(_schema, rowset_meta_path_2,
rowset_meta2, &_rowset_diff_id));
// init rowset meta 3
_json_rowset_meta = "";
std::ifstream infile3(rowset_meta_path_3);
char buffer3[1024];
while (!infile3.eof()) {
infile3.getline(buffer3, 1024);
_json_rowset_meta = _json_rowset_meta + buffer3 + "\n";
}
_json_rowset_meta = _json_rowset_meta.substr(0, _json_rowset_meta.size() - 1);
rowset_id.init(10002);
RowsetMetaSharedPtr rowset_meta3(new RowsetMeta());
rowset_meta3->init_from_json(_json_rowset_meta);
EXPECT_EQ(rowset_meta3->rowset_id(), rowset_id);
EXPECT_EQ(Status::OK(), RowsetFactory::create_rowset(_schema, rowset_meta_path_3,
rowset_meta3, &_rowset_ingested));
_tablet_uid = TabletUid(10, 10);
}
@ -190,6 +207,7 @@ private:
RowsetSharedPtr _rowset;
RowsetSharedPtr _rowset_same_id;
RowsetSharedPtr _rowset_diff_id;
RowsetSharedPtr _rowset_ingested;
};
TEST_F(TxnManagerTest, PrepareNewTxn) {
@ -363,4 +381,23 @@ TEST_F(TxnManagerTest, TabletVersionCache) {
EXPECT_EQ(tx6, 890);
}
TEST_F(TxnManagerTest, DeleteCommittedTxnForIngestingBinlog) {
auto guard = k_engine->pending_local_rowsets().add(_rowset_ingested->rowset_id());
auto st = _txn_mgr->commit_txn(_meta, partition_id, transaction_id, tablet_id, _tablet_uid,
load_id, _rowset_ingested, std::move(guard), false);
ASSERT_TRUE(st.ok()) << st;
RowsetMetaSharedPtr rowset_meta(new RowsetMeta());
st = RowsetMetaManager::get_rowset_meta(_meta, _tablet_uid, _rowset_ingested->rowset_id(),
rowset_meta);
ASSERT_TRUE(st.ok()) << st;
EXPECT_EQ(rowset_meta->rowset_id(), _rowset_ingested->rowset_id());
st = _txn_mgr->delete_txn(_meta, partition_id, transaction_id, tablet_id, _tablet_uid);
ASSERT_TRUE(st.ok()) << st;
RowsetMetaSharedPtr rowset_meta2(new RowsetMeta());
st = RowsetMetaManager::get_rowset_meta(_meta, _tablet_uid, _rowset_ingested->rowset_id(),
rowset_meta2);
ASSERT_FALSE(st.ok()) << st;
EXPECT_FALSE(k_engine->pending_local_rowsets().contains(_rowset_ingested->rowset_id()));
}
} // namespace doris