Files
oceanbase/deps/oblib/unittest/lib/hash/test_link_hashmap.cpp
2023-10-20 06:47:20 +00:00

411 lines
10 KiB
C++

/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include <thread>
#include "lib/hash/ob_link_hashmap.h"
#include "lib/hash/ob_hashutils.h"
#include "lib/allocator/ob_malloc.h"
#include "lib/utility/ob_print_utils.h"
#include "gtest/gtest.h"
using namespace oceanbase;
using namespace common;
using namespace hash;
class HashKey
{
public:
HashKey(): v_(0) {}
HashKey(uint64_t v): v_(v) {}
HashKey(const HashKey& that): v_(that.v_) {}
int compare(const HashKey& that) {
int ret = 0;
if (v_ > that.v_) {
ret = 1;
} else if (v_ < that.v_) {
ret = -1;
}
return ret;
}
uint64_t hash() const { return v_; }
uint64_t v_;
};
class HashValue : public LinkHashValue<HashKey>
{
public:
HashValue(): v_(0) {}
HashValue(uint64_t v): v_(v) {}
HashValue(const HashValue& that): v_(that.v_) {}
uint64_t v_;
};
static uint64_t value_alloc CACHE_ALIGNED;
static uint64_t value_free CACHE_ALIGNED;
static uint64_t node_alloc CACHE_ALIGNED;
static uint64_t node_free CACHE_ALIGNED;
ObMemAttr attr(1, ObNewModIds::OB_MEMSTORE);
static int64_t STEP = 0;
class TestAllocHandle
{
typedef LinkHashNode<HashKey> Node;
public:
TestAllocHandle() : is_inited_(true) {}
~TestAllocHandle() { ATOMIC_STORE(&is_inited_, false); }
HashValue* alloc_value()
{
abort_unless(ATOMIC_LOAD(&is_inited_) == true);
ATOMIC_INC(&value_alloc);
HashValue* value = (HashValue*)ob_malloc(sizeof(HashValue), attr);
new(value) HashValue();
return value;
}
void free_value(HashValue* val)
{
abort_unless(ATOMIC_LOAD(&is_inited_) == true);
ATOMIC_INC(&value_free);
val->~HashValue();
ob_free(val);
}
Node* alloc_node(HashValue* val)
{
abort_unless(ATOMIC_LOAD(&is_inited_) == true);
UNUSED(val);
ATOMIC_INC(&node_alloc);
Node* node = (Node*)ob_malloc(sizeof(Node), attr);
new(node) Node();
return node;
}
void free_node(Node* node)
{
if (ATOMIC_LOAD(&STEP) == 1) {
IGNORE_RETURN ATOMIC_BCAS(&STEP, 1, 2);
usleep(1 * 1000 * 1000);
}
abort_unless(ATOMIC_LOAD(&is_inited_) == true);
ATOMIC_INC(&node_free);
node->~Node();
ob_free(node);
}
private:
bool is_inited_;
};
class AtomicGetFunctor
{
public:
explicit AtomicGetFunctor() : ret_val_(0) {}
void operator()(const HashKey &key, HashValue *value)
{
UNUSED(key);
ret_val_ = value->v_;
}
uint64_t ret_val_;
};
static bool print(HashKey &key, HashValue *value) { UNUSED(key); _OB_LOG(INFO, "key=%lu val=%lu", key.v_, value->v_); return true; }
typedef ObLinkHashMap<HashKey, HashValue, TestAllocHandle> Hashmap;
TEST(TestObHashMap, Feature)
{
Hashmap hm;
HashKey key;
HashValue *val_ptr = nullptr;
EXPECT_EQ(OB_SUCCESS, hm.init());
// insert
key.v_ = 1;
EXPECT_EQ(OB_SUCCESS, hm.create(key, val_ptr));
hm.revert(val_ptr);
// 1:0
// insert key of different bucket
key.v_ = 2;
EXPECT_EQ(OB_SUCCESS, hm.create(key, val_ptr));
hm.revert(val_ptr);
// 1:0, 2:0
// insert key of common bucket
key.v_ = 1;
EXPECT_EQ(OB_ENTRY_EXIST, hm.create(key, val_ptr));
// 1:0, 2:0
// insert
key.v_ = 3;
EXPECT_EQ(OB_SUCCESS, hm.alloc_value(val_ptr));
val_ptr->v_ = 1;
EXPECT_EQ(OB_SUCCESS, hm.insert_and_get(key, val_ptr));
hm.revert(val_ptr);
// 1:0, 2:0, 3:1
// insert key of different bucket
key.v_ = 4;
EXPECT_EQ(OB_SUCCESS, hm.alloc_value(val_ptr));
val_ptr->v_ = 1;
EXPECT_EQ(OB_SUCCESS, hm.insert_and_get(key, val_ptr));
hm.revert(val_ptr);
// 1:0, 2:0, 3:1, 4:1
// insert key of common bucket
key.v_ = 3;
EXPECT_EQ(OB_SUCCESS, hm.alloc_value(val_ptr));
val_ptr->v_ = 2;
EXPECT_EQ(OB_ENTRY_EXIST, hm.insert_and_get(key, val_ptr));
hm.free_value(val_ptr);
// 1:0, 2:0, 3:1, 4:1
// delete existing key
key.v_ = 1;
EXPECT_EQ(OB_SUCCESS, hm.del(key));
// 2:0, 3:1, 4:1
// delete not existing key
key.v_ = 5;
EXPECT_EQ(OB_ENTRY_NOT_EXIST, hm.del(key));
// 2:0, 3:1, 4:1
// query existing key
key.v_ = 2;
val_ptr = nullptr;
EXPECT_EQ(OB_SUCCESS, hm.get(key, val_ptr));
val_ptr->v_ +=1 ;
hm.revert(val_ptr);
// 2:1, 3:1, 4:1
// query not existing key
key.v_ = 6;
val_ptr = nullptr;
EXPECT_EQ(OB_ENTRY_NOT_EXIST, hm.get(key, val_ptr));
// 2:1, 3:1, 4:1
// query existing key
key.v_ = 3;
EXPECT_EQ(OB_ENTRY_EXIST, hm.contains_key(key));
// 2:1, 3:1, 4:1
// query not existing key
key.v_ = 7;
EXPECT_EQ(OB_ENTRY_NOT_EXIST, hm.contains_key(key));
// 2:1, 3:1, 4:1
EXPECT_EQ(OB_SUCCESS, hm.for_each(print));
// 2:1, 3:1, 4:1
AtomicGetFunctor fn;
key.v_ = 2;
EXPECT_EQ(OB_SUCCESS, hm.operate(key, fn));
EXPECT_EQ(1, fn.ret_val_);
hm.reset();
hm.purge();
EXPECT_EQ(0, hm.count());
EXPECT_EQ(0, hm.size());
EXPECT_EQ(value_free, value_alloc);
EXPECT_EQ(node_free, node_alloc);
}
TEST(TestObHashMap, Stress)
{
constexpr int64_t THREAD_COUNT = 8;
constexpr int64_t DATA_COUNT_PER_THREAD = 2048;
Hashmap hm(1024);
// 0 for not inserted, 1 for inserted, 2 for inserted but deleted soon
int db[DATA_COUNT_PER_THREAD * THREAD_COUNT] CACHE_ALIGNED;
memset(db, 0, sizeof(db));
EXPECT_EQ(OB_SUCCESS, hm.init());
// insert continuous DATA_COUNT_PER_THREAD nodes in each thread
std::thread insert_threads[THREAD_COUNT];
for (auto i = 0; i < THREAD_COUNT; ++i) {
insert_threads[i] = std::thread([&, i]() {
HashKey key;
HashValue *val_ptr = nullptr;
for (auto j = 0; j < DATA_COUNT_PER_THREAD; ++j) {
key.v_ = j + i * DATA_COUNT_PER_THREAD;
EXPECT_EQ(OB_SUCCESS, hm.create(key, val_ptr));
ATOMIC_INC(db + key.v_);
hm.revert(val_ptr);
val_ptr = nullptr;
}
});
}
// delete incontinuous DATA_COUNT_PER_THREAD nodes, half of them are duplicated
std::thread del_threads[THREAD_COUNT];
for (auto i = 0; i < THREAD_COUNT; ++i) {
del_threads[i] = std::thread([&, i]() {
HashKey key;
for (auto j = 0; j < DATA_COUNT_PER_THREAD; ++j) {
int ret = OB_SUCCESS;
key.v_ = i / 2 + j * THREAD_COUNT;
while (OB_FAIL(hm.del(key)) && ATOMIC_LOAD(db + key.v_) < 2)
;
if (OB_SUCC(ret)) {
ATOMIC_INC(db + key.v_);
} else {
EXPECT_EQ(OB_ENTRY_NOT_EXIST, ret);
}
}
});
}
std::thread get_threads[THREAD_COUNT];
for (auto i = 0; i < THREAD_COUNT / 2; ++i) {
get_threads[i] = std::thread([&, i]() {
int ret = OB_SUCCESS;
HashKey key;
HashValue *val_ptr = nullptr;
for (auto j = 0; j < DATA_COUNT_PER_THREAD; ++j) {
key.v_ = j + i * DATA_COUNT_PER_THREAD;
if (OB_SUCC(hm.get(key, val_ptr))) {
hm.revert(val_ptr);
}
val_ptr = nullptr;
}
});
}
for (auto i = THREAD_COUNT / 2; i < THREAD_COUNT; ++i) {
get_threads[i] = std::thread([&, i]() {
HashKey key;
for (auto j = 0; j < DATA_COUNT_PER_THREAD; ++j) {
key.v_ = j + i * DATA_COUNT_PER_THREAD;
UNUSED(hm.contains_key(key));
}
});
}
for (auto i = 0; i < THREAD_COUNT; ++i) {
insert_threads[i].join();
}
for (auto i = 0; i < THREAD_COUNT; ++i) {
del_threads[i].join();
}
for (auto i = 0; i < THREAD_COUNT; ++i) {
get_threads[i].join();
}
EXPECT_EQ(DATA_COUNT_PER_THREAD * THREAD_COUNT / 2, hm.size());
int counter[3];
memset(counter, 0, sizeof(counter));
for (int i = 0; i < DATA_COUNT_PER_THREAD * THREAD_COUNT; ++i) {
ASSERT_TRUE(db[i] > 0);
ASSERT_TRUE(db[i] <= 2);
++counter[db[i]];
}
EXPECT_EQ(0, counter[0]);
EXPECT_EQ(DATA_COUNT_PER_THREAD * THREAD_COUNT / 2, counter[1]);
EXPECT_EQ(DATA_COUNT_PER_THREAD * THREAD_COUNT / 2, counter[2]);
hm.reset();
hm.purge();
EXPECT_EQ(0, hm.count());
EXPECT_EQ(0, hm.size());
EXPECT_EQ(value_free, value_alloc);
EXPECT_EQ(node_free, node_alloc);
}
TEST(TestObHashMap, Retire)
{
std::thread* t;
{
Hashmap A;
HashKey key;
HashValue *val_ptr = nullptr;
EXPECT_EQ(OB_SUCCESS, A.init());
key.v_ = 1;
EXPECT_EQ(OB_SUCCESS, A.create(key, val_ptr));
A.revert(val_ptr);
EXPECT_EQ(OB_SUCCESS, A.del(key));
ATOMIC_INC(&STEP);
t = new std::thread([&]() {
usleep(10 * 1000);
Hashmap B;
HashKey key;
EXPECT_EQ(OB_SUCCESS, B.init());
key.v_ = 1;
EXPECT_EQ(OB_SUCCESS, B.create(key, val_ptr));
B.revert(val_ptr);
});
while(ATOMIC_LOAD(&STEP) != 2);
// ~A()
}
t->join();
delete t;
}
/*
TEST(TestObHashMap, Dummy)
{
struct Fn {
bool operator()(HashKey& key, HashValue* value) {
return true;
}
};
Fn fn;
bool stop = false;
constexpr int64_t THREAD_COUNT = 8;
constexpr int64_t DATA_COUNT_PER_THREAD = 81920;
Hashmap hm(2);
EXPECT_EQ(OB_SUCCESS, hm.init());
std::thread insert_threads[THREAD_COUNT];
int64_t current = 0;
for (auto i = 0; i < THREAD_COUNT; ++i) {
insert_threads[i] = std::thread([&]() {
HashKey key;
HashValue *val_ptr = nullptr;
while (ATOMIC_LOAD(&current) < THREAD_COUNT * DATA_COUNT_PER_THREAD) {
key.v_ = ATOMIC_FAA(&current, 1);
EXPECT_EQ(OB_SUCCESS, hm.create(key, val_ptr));
hm.revert(val_ptr);
val_ptr = nullptr;
}
});
}
std::thread foreach_threads[THREAD_COUNT];
for (auto i = 0; i < THREAD_COUNT; ++i) {
foreach_threads[i] = std::thread([&]() {
while (!stop) {
hm.for_each(fn);
}
});
}
for (auto i = 0; i < THREAD_COUNT; ++i) {
insert_threads[i].join();
}
hm.reset();
hm.purge();
ATOMIC_STORE(&stop, true);
for (auto i = 0; i < THREAD_COUNT; ++i) {
foreach_threads[i].join();
};
}
*/
int main(int argc, char **argv)
{
testing::InitGoogleTest(&argc,argv);
oceanbase::common::ObLogger::get_logger().set_file_name("test_link_hashmap.log", true);
oceanbase::common::ObLogger::get_logger().set_log_level("INFO");
return RUN_ALL_TESTS();
}