[enhancement](merge-on-write) add more version and txn information for mow publish (#21257)
This commit is contained in:
@ -147,6 +147,13 @@ Status EnginePublishVersionTask::finish() {
|
||||
// here and wait pre version publish or lock timeout
|
||||
if (tablet->keys_type() == KeysType::UNIQUE_KEYS &&
|
||||
tablet->enable_unique_key_merge_on_write()) {
|
||||
bool first_time_update = false;
|
||||
if (StorageEngine::instance()->txn_manager()->get_txn_by_tablet_version(
|
||||
tablet_info.tablet_id, version.second) < 0) {
|
||||
first_time_update = true;
|
||||
StorageEngine::instance()->txn_manager()->update_tablet_version_txn(
|
||||
tablet_info.tablet_id, version.second, transaction_id);
|
||||
}
|
||||
Version max_version;
|
||||
TabletState tablet_state;
|
||||
{
|
||||
@ -156,12 +163,6 @@ Status EnginePublishVersionTask::finish() {
|
||||
}
|
||||
if (tablet_state == TabletState::TABLET_RUNNING &&
|
||||
version.first != max_version.second + 1) {
|
||||
LOG_EVERY_SECOND(INFO)
|
||||
<< "uniq key with merge-on-write version not continuous, "
|
||||
"current max version="
|
||||
<< max_version.second << ", publish_version=" << version.first
|
||||
<< ", tablet_id=" << tablet->tablet_id()
|
||||
<< ", transaction_id=" << _publish_version_req.transaction_id;
|
||||
// If a tablet migrates out and back, the previously failed
|
||||
// publish task may retry on the new tablet, so check
|
||||
// whether the version exists. if not exist, then set
|
||||
@ -171,6 +172,21 @@ Status EnginePublishVersionTask::finish() {
|
||||
_discontinuous_version_tablets->emplace_back(
|
||||
partition_id, tablet_info.tablet_id, version.first);
|
||||
res = Status::Error<PUBLISH_VERSION_NOT_CONTINUOUS>();
|
||||
int64_t missed_version = max_version.second + 1;
|
||||
int64_t missed_txn_id =
|
||||
StorageEngine::instance()->txn_manager()->get_txn_by_tablet_version(
|
||||
tablet->tablet_id(), missed_version);
|
||||
auto msg = fmt::format(
|
||||
"uniq key with merge-on-write version not continuous, "
|
||||
"missed version={}, it's transaction_id={}, current publish "
|
||||
"version={}, tablet_id={}, transaction_id={}",
|
||||
missed_version, missed_txn_id, version.second, tablet->tablet_id(),
|
||||
_publish_version_req.transaction_id);
|
||||
if (first_time_update) {
|
||||
LOG(INFO) << msg;
|
||||
} else {
|
||||
LOG_EVERY_SECOND(INFO) << msg;
|
||||
}
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
@ -82,6 +82,9 @@ TxnManager::TxnManager(int32_t txn_map_shard_size, int32_t txn_shard_size)
|
||||
_txn_mutex = new std::shared_mutex[_txn_shard_size];
|
||||
_txn_tablet_delta_writer_map = new txn_tablet_delta_writer_map_t[_txn_map_shard_size];
|
||||
_txn_tablet_delta_writer_map_locks = new std::shared_mutex[_txn_map_shard_size];
|
||||
// For debugging
|
||||
_tablet_version_cache =
|
||||
new ShardedLRUCache("TabletVersionCache", 100000, LRUCacheType::NUMBER, 32);
|
||||
}
|
||||
|
||||
// prepare txn should always be allowed because ingest task will be retried
|
||||
@ -745,4 +748,37 @@ void TxnManager::clear_txn_tablet_delta_writer(int64_t transaction_id) {
|
||||
VLOG_CRITICAL << "remove delta writer manager, txn_id=" << transaction_id;
|
||||
}
|
||||
|
||||
int64_t TxnManager::get_txn_by_tablet_version(int64_t tablet_id, int64_t version) {
|
||||
char key[16];
|
||||
memcpy(key, &tablet_id, sizeof(int64_t));
|
||||
memcpy(key + sizeof(int64_t), &version, sizeof(int64_t));
|
||||
CacheKey cache_key((const char*)&key, sizeof(key));
|
||||
|
||||
auto handle = _tablet_version_cache->lookup(cache_key);
|
||||
if (handle == nullptr) {
|
||||
return -1;
|
||||
}
|
||||
int64_t res = *(int64_t*)_tablet_version_cache->value(handle);
|
||||
_tablet_version_cache->release(handle);
|
||||
return res;
|
||||
}
|
||||
|
||||
void TxnManager::update_tablet_version_txn(int64_t tablet_id, int64_t version, int64_t txn_id) {
|
||||
char key[16];
|
||||
memcpy(key, &tablet_id, sizeof(int64_t));
|
||||
memcpy(key + sizeof(int64_t), &version, sizeof(int64_t));
|
||||
CacheKey cache_key((const char*)&key, sizeof(key));
|
||||
|
||||
int64_t* value = new int64_t;
|
||||
*value = txn_id;
|
||||
auto deleter = [](const doris::CacheKey& key, void* value) {
|
||||
int64_t* cache_value = (int64_t*)value;
|
||||
delete cache_value;
|
||||
};
|
||||
|
||||
auto handle = _tablet_version_cache->insert(cache_key, value, sizeof(txn_id), deleter,
|
||||
CachePriority::NORMAL, sizeof(txn_id));
|
||||
_tablet_version_cache->release(handle);
|
||||
}
|
||||
|
||||
} // namespace doris
|
||||
|
||||
@ -90,6 +90,7 @@ public:
|
||||
delete[] _txn_mutex;
|
||||
delete[] _txn_tablet_delta_writer_map;
|
||||
delete[] _txn_tablet_delta_writer_map_locks;
|
||||
delete _tablet_version_cache;
|
||||
}
|
||||
|
||||
// add a txn to manager
|
||||
@ -172,6 +173,9 @@ public:
|
||||
DeleteBitmapPtr delete_bitmap,
|
||||
const RowsetIdUnorderedSet& rowset_ids);
|
||||
|
||||
int64_t get_txn_by_tablet_version(int64_t tablet_id, int64_t version);
|
||||
void update_tablet_version_txn(int64_t tablet_id, int64_t version, int64_t txn_id);
|
||||
|
||||
private:
|
||||
using TxnKey = std::pair<int64_t, int64_t>; // partition_id, transaction_id;
|
||||
|
||||
@ -232,6 +236,7 @@ private:
|
||||
std::shared_mutex* _txn_mutex;
|
||||
|
||||
txn_tablet_delta_writer_map_t* _txn_tablet_delta_writer_map;
|
||||
ShardedLRUCache* _tablet_version_cache;
|
||||
std::shared_mutex* _txn_tablet_delta_writer_map_locks;
|
||||
DISALLOW_COPY_AND_ASSIGN(TxnManager);
|
||||
}; // TxnManager
|
||||
|
||||
@ -332,4 +332,24 @@ TEST_F(TxnManagerTest, DeleteCommittedTxn) {
|
||||
EXPECT_TRUE(status != Status::OK());
|
||||
}
|
||||
|
||||
TEST_F(TxnManagerTest, TabletVersionCache) {
|
||||
std::unique_ptr<TxnManager> txn_mgr = std::make_unique<TxnManager>(64, 1024);
|
||||
txn_mgr->update_tablet_version_txn(123, 100, 456);
|
||||
txn_mgr->update_tablet_version_txn(124, 100, 567);
|
||||
int64_t tx1 = txn_mgr->get_txn_by_tablet_version(123, 100);
|
||||
EXPECT_EQ(tx1, 456);
|
||||
int64_t tx2 = txn_mgr->get_txn_by_tablet_version(124, 100);
|
||||
EXPECT_EQ(tx2, 567);
|
||||
int64_t tx3 = txn_mgr->get_txn_by_tablet_version(124, 101);
|
||||
EXPECT_EQ(tx3, -1);
|
||||
txn_mgr->update_tablet_version_txn(123, 101, 888);
|
||||
txn_mgr->update_tablet_version_txn(124, 101, 890);
|
||||
int64_t tx4 = txn_mgr->get_txn_by_tablet_version(123, 100);
|
||||
EXPECT_EQ(tx4, 456);
|
||||
int64_t tx5 = txn_mgr->get_txn_by_tablet_version(123, 101);
|
||||
EXPECT_EQ(tx5, 888);
|
||||
int64_t tx6 = txn_mgr->get_txn_by_tablet_version(124, 101);
|
||||
EXPECT_EQ(tx6, 890);
|
||||
}
|
||||
|
||||
} // namespace doris
|
||||
|
||||
Reference in New Issue
Block a user