411 lines
10 KiB
C++
411 lines
10 KiB
C++
/**
|
|
* Copyright (c) 2021 OceanBase
|
|
* OceanBase CE is licensed under Mulan PubL v2.
|
|
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
|
* You may obtain a copy of Mulan PubL v2 at:
|
|
* http://license.coscl.org.cn/MulanPubL-2.0
|
|
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
|
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
|
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
|
* See the Mulan PubL v2 for more details.
|
|
*/
|
|
|
|
#include <thread>
|
|
#include "lib/hash/ob_link_hashmap.h"
|
|
#include "lib/hash/ob_hashutils.h"
|
|
#include "lib/allocator/ob_malloc.h"
|
|
#include "lib/utility/ob_print_utils.h"
|
|
|
|
#include "gtest/gtest.h"
|
|
|
|
using namespace oceanbase;
|
|
using namespace common;
|
|
using namespace hash;
|
|
|
|
class HashKey
|
|
{
|
|
public:
|
|
HashKey(): v_(0) {}
|
|
HashKey(uint64_t v): v_(v) {}
|
|
HashKey(const HashKey& that): v_(that.v_) {}
|
|
int compare(const HashKey& that) {
|
|
int ret = 0;
|
|
if (v_ > that.v_) {
|
|
ret = 1;
|
|
} else if (v_ < that.v_) {
|
|
ret = -1;
|
|
}
|
|
return ret;
|
|
}
|
|
uint64_t hash() const { return v_; }
|
|
uint64_t v_;
|
|
};
|
|
|
|
class HashValue : public LinkHashValue<HashKey>
|
|
{
|
|
public:
|
|
HashValue(): v_(0) {}
|
|
HashValue(uint64_t v): v_(v) {}
|
|
HashValue(const HashValue& that): v_(that.v_) {}
|
|
uint64_t v_;
|
|
};
|
|
|
|
static uint64_t value_alloc CACHE_ALIGNED;
|
|
static uint64_t value_free CACHE_ALIGNED;
|
|
static uint64_t node_alloc CACHE_ALIGNED;
|
|
static uint64_t node_free CACHE_ALIGNED;
|
|
|
|
ObMemAttr attr(1, ObNewModIds::OB_MEMSTORE);
|
|
|
|
static int64_t STEP = 0;
|
|
|
|
class TestAllocHandle
|
|
{
|
|
typedef LinkHashNode<HashKey> Node;
|
|
public:
|
|
TestAllocHandle() : is_inited_(true) {}
|
|
~TestAllocHandle() { ATOMIC_STORE(&is_inited_, false); }
|
|
HashValue* alloc_value()
|
|
{
|
|
abort_unless(ATOMIC_LOAD(&is_inited_) == true);
|
|
ATOMIC_INC(&value_alloc);
|
|
HashValue* value = (HashValue*)ob_malloc(sizeof(HashValue), attr);
|
|
new(value) HashValue();
|
|
return value;
|
|
}
|
|
void free_value(HashValue* val)
|
|
{
|
|
abort_unless(ATOMIC_LOAD(&is_inited_) == true);
|
|
ATOMIC_INC(&value_free);
|
|
val->~HashValue();
|
|
ob_free(val);
|
|
}
|
|
Node* alloc_node(HashValue* val)
|
|
{
|
|
abort_unless(ATOMIC_LOAD(&is_inited_) == true);
|
|
UNUSED(val);
|
|
ATOMIC_INC(&node_alloc);
|
|
Node* node = (Node*)ob_malloc(sizeof(Node), attr);
|
|
new(node) Node();
|
|
return node;
|
|
}
|
|
void free_node(Node* node)
|
|
{
|
|
if (ATOMIC_LOAD(&STEP) == 1) {
|
|
IGNORE_RETURN ATOMIC_BCAS(&STEP, 1, 2);
|
|
usleep(1 * 1000 * 1000);
|
|
}
|
|
abort_unless(ATOMIC_LOAD(&is_inited_) == true);
|
|
ATOMIC_INC(&node_free);
|
|
node->~Node();
|
|
ob_free(node);
|
|
}
|
|
private:
|
|
bool is_inited_;
|
|
};
|
|
|
|
class AtomicGetFunctor
|
|
{
|
|
public:
|
|
explicit AtomicGetFunctor() : ret_val_(0) {}
|
|
void operator()(const HashKey &key, HashValue *value)
|
|
{
|
|
UNUSED(key);
|
|
ret_val_ = value->v_;
|
|
}
|
|
uint64_t ret_val_;
|
|
};
|
|
|
|
static bool print(HashKey &key, HashValue *value) { UNUSED(key); _OB_LOG(INFO, "key=%lu val=%lu", key.v_, value->v_); return true; }
|
|
|
|
typedef ObLinkHashMap<HashKey, HashValue, TestAllocHandle> Hashmap;
|
|
|
|
TEST(TestObHashMap, Feature)
|
|
{
|
|
Hashmap hm;
|
|
HashKey key;
|
|
HashValue *val_ptr = nullptr;
|
|
|
|
EXPECT_EQ(OB_SUCCESS, hm.init());
|
|
|
|
// insert
|
|
key.v_ = 1;
|
|
EXPECT_EQ(OB_SUCCESS, hm.create(key, val_ptr));
|
|
hm.revert(val_ptr);
|
|
// 1:0
|
|
|
|
// insert key of different bucket
|
|
key.v_ = 2;
|
|
EXPECT_EQ(OB_SUCCESS, hm.create(key, val_ptr));
|
|
hm.revert(val_ptr);
|
|
// 1:0, 2:0
|
|
|
|
// insert key of common bucket
|
|
key.v_ = 1;
|
|
EXPECT_EQ(OB_ENTRY_EXIST, hm.create(key, val_ptr));
|
|
// 1:0, 2:0
|
|
|
|
// insert
|
|
key.v_ = 3;
|
|
EXPECT_EQ(OB_SUCCESS, hm.alloc_value(val_ptr));
|
|
val_ptr->v_ = 1;
|
|
EXPECT_EQ(OB_SUCCESS, hm.insert_and_get(key, val_ptr));
|
|
hm.revert(val_ptr);
|
|
// 1:0, 2:0, 3:1
|
|
|
|
// insert key of different bucket
|
|
key.v_ = 4;
|
|
EXPECT_EQ(OB_SUCCESS, hm.alloc_value(val_ptr));
|
|
val_ptr->v_ = 1;
|
|
EXPECT_EQ(OB_SUCCESS, hm.insert_and_get(key, val_ptr));
|
|
hm.revert(val_ptr);
|
|
// 1:0, 2:0, 3:1, 4:1
|
|
|
|
// insert key of common bucket
|
|
key.v_ = 3;
|
|
EXPECT_EQ(OB_SUCCESS, hm.alloc_value(val_ptr));
|
|
val_ptr->v_ = 2;
|
|
EXPECT_EQ(OB_ENTRY_EXIST, hm.insert_and_get(key, val_ptr));
|
|
hm.free_value(val_ptr);
|
|
// 1:0, 2:0, 3:1, 4:1
|
|
|
|
// delete existing key
|
|
key.v_ = 1;
|
|
EXPECT_EQ(OB_SUCCESS, hm.del(key));
|
|
// 2:0, 3:1, 4:1
|
|
|
|
// delete not existing key
|
|
key.v_ = 5;
|
|
EXPECT_EQ(OB_ENTRY_NOT_EXIST, hm.del(key));
|
|
// 2:0, 3:1, 4:1
|
|
|
|
// query existing key
|
|
key.v_ = 2;
|
|
val_ptr = nullptr;
|
|
EXPECT_EQ(OB_SUCCESS, hm.get(key, val_ptr));
|
|
val_ptr->v_ +=1 ;
|
|
hm.revert(val_ptr);
|
|
// 2:1, 3:1, 4:1
|
|
|
|
// query not existing key
|
|
key.v_ = 6;
|
|
val_ptr = nullptr;
|
|
EXPECT_EQ(OB_ENTRY_NOT_EXIST, hm.get(key, val_ptr));
|
|
// 2:1, 3:1, 4:1
|
|
|
|
// query existing key
|
|
key.v_ = 3;
|
|
EXPECT_EQ(OB_ENTRY_EXIST, hm.contains_key(key));
|
|
// 2:1, 3:1, 4:1
|
|
|
|
// query not existing key
|
|
key.v_ = 7;
|
|
EXPECT_EQ(OB_ENTRY_NOT_EXIST, hm.contains_key(key));
|
|
// 2:1, 3:1, 4:1
|
|
|
|
EXPECT_EQ(OB_SUCCESS, hm.for_each(print));
|
|
// 2:1, 3:1, 4:1
|
|
|
|
AtomicGetFunctor fn;
|
|
key.v_ = 2;
|
|
EXPECT_EQ(OB_SUCCESS, hm.operate(key, fn));
|
|
EXPECT_EQ(1, fn.ret_val_);
|
|
|
|
hm.reset();
|
|
hm.purge();
|
|
|
|
EXPECT_EQ(0, hm.count());
|
|
EXPECT_EQ(0, hm.size());
|
|
EXPECT_EQ(value_free, value_alloc);
|
|
EXPECT_EQ(node_free, node_alloc);
|
|
}
|
|
|
|
TEST(TestObHashMap, Stress)
|
|
{
|
|
constexpr int64_t THREAD_COUNT = 8;
|
|
constexpr int64_t DATA_COUNT_PER_THREAD = 2048;
|
|
|
|
Hashmap hm(1024);
|
|
// 0 for not inserted, 1 for inserted, 2 for inserted but deleted soon
|
|
int db[DATA_COUNT_PER_THREAD * THREAD_COUNT] CACHE_ALIGNED;
|
|
|
|
memset(db, 0, sizeof(db));
|
|
EXPECT_EQ(OB_SUCCESS, hm.init());
|
|
|
|
// insert continuous DATA_COUNT_PER_THREAD nodes in each thread
|
|
std::thread insert_threads[THREAD_COUNT];
|
|
for (auto i = 0; i < THREAD_COUNT; ++i) {
|
|
insert_threads[i] = std::thread([&, i]() {
|
|
HashKey key;
|
|
HashValue *val_ptr = nullptr;
|
|
for (auto j = 0; j < DATA_COUNT_PER_THREAD; ++j) {
|
|
key.v_ = j + i * DATA_COUNT_PER_THREAD;
|
|
EXPECT_EQ(OB_SUCCESS, hm.create(key, val_ptr));
|
|
ATOMIC_INC(db + key.v_);
|
|
hm.revert(val_ptr);
|
|
val_ptr = nullptr;
|
|
}
|
|
});
|
|
}
|
|
// delete incontinuous DATA_COUNT_PER_THREAD nodes, half of them are duplicated
|
|
std::thread del_threads[THREAD_COUNT];
|
|
for (auto i = 0; i < THREAD_COUNT; ++i) {
|
|
del_threads[i] = std::thread([&, i]() {
|
|
HashKey key;
|
|
for (auto j = 0; j < DATA_COUNT_PER_THREAD; ++j) {
|
|
int ret = OB_SUCCESS;
|
|
key.v_ = i / 2 + j * THREAD_COUNT;
|
|
while (OB_FAIL(hm.del(key)) && ATOMIC_LOAD(db + key.v_) < 2)
|
|
;
|
|
if (OB_SUCC(ret)) {
|
|
ATOMIC_INC(db + key.v_);
|
|
} else {
|
|
EXPECT_EQ(OB_ENTRY_NOT_EXIST, ret);
|
|
}
|
|
}
|
|
});
|
|
}
|
|
|
|
std::thread get_threads[THREAD_COUNT];
|
|
for (auto i = 0; i < THREAD_COUNT / 2; ++i) {
|
|
get_threads[i] = std::thread([&, i]() {
|
|
int ret = OB_SUCCESS;
|
|
HashKey key;
|
|
HashValue *val_ptr = nullptr;
|
|
for (auto j = 0; j < DATA_COUNT_PER_THREAD; ++j) {
|
|
key.v_ = j + i * DATA_COUNT_PER_THREAD;
|
|
if (OB_SUCC(hm.get(key, val_ptr))) {
|
|
hm.revert(val_ptr);
|
|
}
|
|
val_ptr = nullptr;
|
|
}
|
|
});
|
|
}
|
|
for (auto i = THREAD_COUNT / 2; i < THREAD_COUNT; ++i) {
|
|
get_threads[i] = std::thread([&, i]() {
|
|
HashKey key;
|
|
for (auto j = 0; j < DATA_COUNT_PER_THREAD; ++j) {
|
|
key.v_ = j + i * DATA_COUNT_PER_THREAD;
|
|
UNUSED(hm.contains_key(key));
|
|
}
|
|
});
|
|
}
|
|
|
|
for (auto i = 0; i < THREAD_COUNT; ++i) {
|
|
insert_threads[i].join();
|
|
}
|
|
|
|
for (auto i = 0; i < THREAD_COUNT; ++i) {
|
|
del_threads[i].join();
|
|
}
|
|
|
|
for (auto i = 0; i < THREAD_COUNT; ++i) {
|
|
get_threads[i].join();
|
|
}
|
|
|
|
EXPECT_EQ(DATA_COUNT_PER_THREAD * THREAD_COUNT / 2, hm.size());
|
|
int counter[3];
|
|
memset(counter, 0, sizeof(counter));
|
|
for (int i = 0; i < DATA_COUNT_PER_THREAD * THREAD_COUNT; ++i) {
|
|
ASSERT_TRUE(db[i] > 0);
|
|
ASSERT_TRUE(db[i] <= 2);
|
|
++counter[db[i]];
|
|
}
|
|
EXPECT_EQ(0, counter[0]);
|
|
EXPECT_EQ(DATA_COUNT_PER_THREAD * THREAD_COUNT / 2, counter[1]);
|
|
EXPECT_EQ(DATA_COUNT_PER_THREAD * THREAD_COUNT / 2, counter[2]);
|
|
|
|
hm.reset();
|
|
hm.purge();
|
|
|
|
EXPECT_EQ(0, hm.count());
|
|
EXPECT_EQ(0, hm.size());
|
|
EXPECT_EQ(value_free, value_alloc);
|
|
EXPECT_EQ(node_free, node_alloc);
|
|
}
|
|
|
|
TEST(TestObHashMap, Retire)
|
|
{
|
|
std::thread* t;
|
|
{
|
|
Hashmap A;
|
|
HashKey key;
|
|
HashValue *val_ptr = nullptr;
|
|
EXPECT_EQ(OB_SUCCESS, A.init());
|
|
key.v_ = 1;
|
|
EXPECT_EQ(OB_SUCCESS, A.create(key, val_ptr));
|
|
A.revert(val_ptr);
|
|
EXPECT_EQ(OB_SUCCESS, A.del(key));
|
|
ATOMIC_INC(&STEP);
|
|
t = new std::thread([&]() {
|
|
usleep(10 * 1000);
|
|
Hashmap B;
|
|
HashKey key;
|
|
EXPECT_EQ(OB_SUCCESS, B.init());
|
|
key.v_ = 1;
|
|
EXPECT_EQ(OB_SUCCESS, B.create(key, val_ptr));
|
|
B.revert(val_ptr);
|
|
});
|
|
while(ATOMIC_LOAD(&STEP) != 2);
|
|
// ~A()
|
|
}
|
|
t->join();
|
|
delete t;
|
|
}
|
|
|
|
/*
|
|
TEST(TestObHashMap, Dummy)
|
|
{
|
|
struct Fn {
|
|
bool operator()(HashKey& key, HashValue* value) {
|
|
return true;
|
|
}
|
|
};
|
|
Fn fn;
|
|
bool stop = false;
|
|
constexpr int64_t THREAD_COUNT = 8;
|
|
constexpr int64_t DATA_COUNT_PER_THREAD = 81920;
|
|
Hashmap hm(2);
|
|
EXPECT_EQ(OB_SUCCESS, hm.init());
|
|
std::thread insert_threads[THREAD_COUNT];
|
|
int64_t current = 0;
|
|
for (auto i = 0; i < THREAD_COUNT; ++i) {
|
|
insert_threads[i] = std::thread([&]() {
|
|
HashKey key;
|
|
HashValue *val_ptr = nullptr;
|
|
while (ATOMIC_LOAD(¤t) < THREAD_COUNT * DATA_COUNT_PER_THREAD) {
|
|
key.v_ = ATOMIC_FAA(¤t, 1);
|
|
EXPECT_EQ(OB_SUCCESS, hm.create(key, val_ptr));
|
|
hm.revert(val_ptr);
|
|
val_ptr = nullptr;
|
|
}
|
|
});
|
|
}
|
|
std::thread foreach_threads[THREAD_COUNT];
|
|
for (auto i = 0; i < THREAD_COUNT; ++i) {
|
|
foreach_threads[i] = std::thread([&]() {
|
|
while (!stop) {
|
|
hm.for_each(fn);
|
|
}
|
|
});
|
|
}
|
|
for (auto i = 0; i < THREAD_COUNT; ++i) {
|
|
insert_threads[i].join();
|
|
}
|
|
hm.reset();
|
|
hm.purge();
|
|
ATOMIC_STORE(&stop, true);
|
|
for (auto i = 0; i < THREAD_COUNT; ++i) {
|
|
foreach_threads[i].join();
|
|
};
|
|
}
|
|
*/
|
|
|
|
int main(int argc, char **argv)
|
|
{
|
|
testing::InitGoogleTest(&argc,argv);
|
|
oceanbase::common::ObLogger::get_logger().set_file_name("test_link_hashmap.log", true);
|
|
oceanbase::common::ObLogger::get_logger().set_log_level("INFO");
|
|
return RUN_ALL_TESTS();
|
|
}
|