363 lines
		
	
	
		
			9.0 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			363 lines
		
	
	
		
			9.0 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /**
 | |
|  * Copyright (c) 2021 OceanBase
 | |
|  * OceanBase CE is licensed under Mulan PubL v2.
 | |
|  * You can use this software according to the terms and conditions of the Mulan PubL v2.
 | |
|  * You may obtain a copy of Mulan PubL v2 at:
 | |
|  *          http://license.coscl.org.cn/MulanPubL-2.0
 | |
|  * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 | |
|  * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 | |
|  * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 | |
|  * See the Mulan PubL v2 for more details.
 | |
|  */
 | |
| 
 | |
| #include <thread>
 | |
| #include "lib/hash/ob_link_hashmap.h"
 | |
| #include "lib/hash/ob_hashutils.h"
 | |
| #include "lib/allocator/ob_malloc.h"
 | |
| #include "lib/utility/ob_print_utils.h"
 | |
| 
 | |
| #include "gtest/gtest.h"
 | |
| 
 | |
| using namespace oceanbase;
 | |
| using namespace common;
 | |
| using namespace hash;
 | |
| 
 | |
| class HashKey
 | |
| {
 | |
| public:
 | |
|   HashKey(): v_(0) {}
 | |
|   HashKey(uint64_t v): v_(v) {}
 | |
|   HashKey(const HashKey& that): v_(that.v_) {}
 | |
|   int compare(const HashKey& that) {
 | |
|     int ret = 0;
 | |
|     if (v_ > that.v_) {
 | |
|       ret = 1;
 | |
|     } else if (v_ < that.v_) {
 | |
|       ret = -1;
 | |
|     }
 | |
|     return ret;
 | |
|   }
 | |
|   uint64_t hash() const { return v_; }
 | |
|   uint64_t v_;
 | |
| };
 | |
| 
 | |
| class HashValue : public LinkHashValue<HashKey>
 | |
| {
 | |
| public:
 | |
|   HashValue(): v_(0) {}
 | |
|   HashValue(uint64_t v): v_(v) {}
 | |
|   HashValue(const HashValue& that): v_(that.v_) {}
 | |
|   uint64_t v_;
 | |
| };
 | |
| 
 | |
| static uint64_t value_alloc CACHE_ALIGNED;
 | |
| static uint64_t value_free CACHE_ALIGNED;
 | |
| static uint64_t node_alloc CACHE_ALIGNED;
 | |
| static uint64_t node_free CACHE_ALIGNED;
 | |
| 
 | |
| ObMemAttr attr(1001, ObNewModIds::OB_MEMSTORE);
 | |
| 
 | |
| static int64_t STEP = 0;
 | |
| 
 | |
| class TestAllocHandle
 | |
| {
 | |
|   typedef LinkHashNode<HashKey> Node;
 | |
| public:
 | |
|   TestAllocHandle() : is_inited_(true) {}
 | |
|   ~TestAllocHandle() { ATOMIC_STORE(&is_inited_, false); }
 | |
|   HashValue* alloc_value()
 | |
|   {
 | |
|     abort_unless(ATOMIC_LOAD(&is_inited_) == true);
 | |
|     ATOMIC_INC(&value_alloc);
 | |
|     HashValue* value = (HashValue*)ob_malloc(sizeof(HashValue), attr);
 | |
|     new(value) HashValue();
 | |
|     return value;
 | |
|   }
 | |
|   void free_value(HashValue* val)
 | |
|   {
 | |
|     abort_unless(ATOMIC_LOAD(&is_inited_) == true);
 | |
|     ATOMIC_INC(&value_free);
 | |
|     val->~HashValue();
 | |
|     ob_free(val);
 | |
|   }
 | |
|   Node* alloc_node(HashValue* val)
 | |
|   {
 | |
|     abort_unless(ATOMIC_LOAD(&is_inited_) == true);
 | |
|     UNUSED(val);
 | |
|     ATOMIC_INC(&node_alloc);
 | |
|     Node* node = (Node*)ob_malloc(sizeof(Node), attr);
 | |
|     new(node) Node();
 | |
|     return node;
 | |
|   }
 | |
|   void free_node(Node* node)
 | |
|   {
 | |
|     if (ATOMIC_LOAD(&STEP) == 1) {
 | |
|       IGNORE_RETURN ATOMIC_BCAS(&STEP, 1, 2);
 | |
|       usleep(1 * 1000 * 1000);
 | |
|     }
 | |
|     abort_unless(ATOMIC_LOAD(&is_inited_) == true);
 | |
|     ATOMIC_INC(&node_free);
 | |
|     node->~Node();
 | |
|     ob_free(node);
 | |
|   }
 | |
| private:
 | |
|   bool is_inited_;
 | |
| };
 | |
| 
 | |
| class AtomicGetFunctor
 | |
| {
 | |
| public:
 | |
|   explicit AtomicGetFunctor() : ret_val_(0) {}
 | |
|   void operator()(const HashKey &key, HashValue *value)
 | |
|   {
 | |
|     UNUSED(key);
 | |
|     ret_val_ = value->v_;
 | |
|   }
 | |
|   uint64_t ret_val_;
 | |
| };
 | |
| 
 | |
| static bool print(HashKey &key, HashValue *value) { UNUSED(key); _OB_LOG(INFO, "key=%lu val=%lu", key.v_, value->v_); return true; }
 | |
| 
 | |
| typedef ObLinkHashMap<HashKey, HashValue, TestAllocHandle> Hashmap;
 | |
| 
 | |
| TEST(TestObHashMap, Feature)
 | |
| {
 | |
|   Hashmap hm;
 | |
|   HashKey key;
 | |
|   HashValue *val_ptr = nullptr;
 | |
| 
 | |
|   EXPECT_EQ(OB_SUCCESS, hm.init());
 | |
| 
 | |
|   // insert
 | |
|   key.v_ = 1;
 | |
|   EXPECT_EQ(OB_SUCCESS, hm.create(key, val_ptr));
 | |
|   hm.revert(val_ptr);
 | |
|   // 1:0
 | |
| 
 | |
|   // insert key of different bucket
 | |
|   key.v_ = 2;
 | |
|   EXPECT_EQ(OB_SUCCESS, hm.create(key, val_ptr));
 | |
|   hm.revert(val_ptr);
 | |
|   // 1:0, 2:0
 | |
| 
 | |
|   // insert key of common bucket
 | |
|   key.v_ = 1;
 | |
|   EXPECT_EQ(OB_ENTRY_EXIST, hm.create(key, val_ptr));
 | |
|   // 1:0, 2:0
 | |
| 
 | |
|   // insert
 | |
|   key.v_ = 3;
 | |
|   EXPECT_EQ(OB_SUCCESS, hm.alloc_value(val_ptr));
 | |
|   val_ptr->v_ = 1;
 | |
|   EXPECT_EQ(OB_SUCCESS, hm.insert_and_get(key, val_ptr));
 | |
|   hm.revert(val_ptr);
 | |
|   // 1:0, 2:0, 3:1
 | |
| 
 | |
|   // insert key of different bucket
 | |
|   key.v_ = 4;
 | |
|   EXPECT_EQ(OB_SUCCESS, hm.alloc_value(val_ptr));
 | |
|   val_ptr->v_ = 1;
 | |
|   EXPECT_EQ(OB_SUCCESS, hm.insert_and_get(key, val_ptr));
 | |
|   hm.revert(val_ptr);
 | |
|   // 1:0, 2:0, 3:1, 4:1
 | |
| 
 | |
|   // insert key of common bucket
 | |
|   key.v_ = 3;
 | |
|   EXPECT_EQ(OB_SUCCESS, hm.alloc_value(val_ptr));
 | |
|   val_ptr->v_ = 2;
 | |
|   EXPECT_EQ(OB_ENTRY_EXIST, hm.insert_and_get(key, val_ptr));
 | |
|   hm.free_value(val_ptr);
 | |
|   // 1:0, 2:0, 3:1, 4:1
 | |
| 
 | |
|   // delete existing key
 | |
|   key.v_ = 1;
 | |
|   EXPECT_EQ(OB_SUCCESS, hm.del(key));
 | |
|   // 2:0, 3:1, 4:1
 | |
| 
 | |
|   // delete not existing key
 | |
|   key.v_ = 5;
 | |
|   EXPECT_EQ(OB_ENTRY_NOT_EXIST, hm.del(key));
 | |
|   // 2:0, 3:1, 4:1
 | |
| 
 | |
|   // query existing key
 | |
|   key.v_ = 2;
 | |
|   val_ptr = nullptr;
 | |
|   EXPECT_EQ(OB_SUCCESS, hm.get(key, val_ptr));
 | |
|   val_ptr->v_ +=1 ;
 | |
|   hm.revert(val_ptr);
 | |
|   // 2:1, 3:1, 4:1
 | |
| 
 | |
|   // query not existing key
 | |
|   key.v_ = 6;
 | |
|   val_ptr = nullptr;
 | |
|   EXPECT_EQ(OB_ENTRY_NOT_EXIST, hm.get(key, val_ptr));
 | |
|   // 2:1, 3:1, 4:1
 | |
| 
 | |
|   // query existing key
 | |
|   key.v_ = 3;
 | |
|   EXPECT_EQ(OB_ENTRY_EXIST, hm.contains_key(key));
 | |
|   // 2:1, 3:1, 4:1
 | |
| 
 | |
|   // query not existing key
 | |
|   key.v_ = 7;
 | |
|   EXPECT_EQ(OB_ENTRY_NOT_EXIST, hm.contains_key(key));
 | |
|   // 2:1, 3:1, 4:1
 | |
| 
 | |
|   EXPECT_EQ(OB_SUCCESS, hm.for_each(print));
 | |
|   // 2:1, 3:1, 4:1
 | |
| 
 | |
|   AtomicGetFunctor fn;
 | |
|   key.v_ = 2;
 | |
|   EXPECT_EQ(OB_SUCCESS, hm.operate(key, fn));
 | |
|   EXPECT_EQ(1, fn.ret_val_);
 | |
| 
 | |
|   hm.reset();
 | |
|   hm.purge();
 | |
| 
 | |
|   EXPECT_EQ(0, hm.count());
 | |
|   EXPECT_EQ(0, hm.size());
 | |
|   EXPECT_EQ(value_free, value_alloc);
 | |
|   EXPECT_EQ(node_free, node_alloc);
 | |
| }
 | |
| 
 | |
| TEST(TestObHashMap, Stress)
 | |
| {
 | |
|   constexpr int64_t THREAD_COUNT = 8;
 | |
|   constexpr int64_t DATA_COUNT_PER_THREAD = 2048;
 | |
| 
 | |
|   Hashmap hm(1024);
 | |
|   // 0 for not inserted, 1 for inserted, 2 for inserted but deleted soon
 | |
|   int db[DATA_COUNT_PER_THREAD * THREAD_COUNT] CACHE_ALIGNED;
 | |
| 
 | |
|   memset(db, 0, sizeof(db));
 | |
|   EXPECT_EQ(OB_SUCCESS, hm.init());
 | |
| 
 | |
|   // insert continuous DATA_COUNT_PER_THREAD nodes in each thread
 | |
|   std::thread insert_threads[THREAD_COUNT];
 | |
|   for (auto i = 0; i < THREAD_COUNT; ++i) {
 | |
|     insert_threads[i] = std::thread([&, i]() {
 | |
|       HashKey key;
 | |
|       HashValue *val_ptr = nullptr;
 | |
|       for (auto j = 0; j < DATA_COUNT_PER_THREAD; ++j) {
 | |
|         key.v_ = j + i * DATA_COUNT_PER_THREAD;
 | |
|         EXPECT_EQ(OB_SUCCESS, hm.create(key, val_ptr));
 | |
|         ATOMIC_INC(db + key.v_);
 | |
|         hm.revert(val_ptr);
 | |
|         val_ptr = nullptr;
 | |
|       }
 | |
|     });
 | |
|   }
 | |
|   // delete incontinuous DATA_COUNT_PER_THREAD nodes, half of them are duplicated
 | |
|   std::thread del_threads[THREAD_COUNT];
 | |
|   for (auto i = 0; i < THREAD_COUNT; ++i) {
 | |
|     del_threads[i] = std::thread([&, i]() {
 | |
|       HashKey key;
 | |
|       for (auto j = 0; j < DATA_COUNT_PER_THREAD; ++j) {
 | |
|         int ret = OB_SUCCESS;
 | |
|         key.v_ = i / 2 + j * THREAD_COUNT;
 | |
|         while (OB_FAIL(hm.del(key)) && ATOMIC_LOAD(db + key.v_) < 2)
 | |
|           ;
 | |
|         if (OB_SUCC(ret)) {
 | |
|           ATOMIC_INC(db + key.v_);
 | |
|         } else {
 | |
|           EXPECT_EQ(OB_ENTRY_NOT_EXIST, ret);
 | |
|         }
 | |
|       }
 | |
|     });
 | |
|   }
 | |
| 
 | |
|   std::thread get_threads[THREAD_COUNT];
 | |
|   for (auto i = 0; i < THREAD_COUNT / 2; ++i) {
 | |
|     get_threads[i] = std::thread([&, i]() {
 | |
|       int ret = OB_SUCCESS;
 | |
|       HashKey key;
 | |
|       HashValue *val_ptr = nullptr;
 | |
|       for (auto j = 0; j < DATA_COUNT_PER_THREAD; ++j) {
 | |
|         key.v_ = j + i * DATA_COUNT_PER_THREAD;
 | |
|         if (OB_SUCC(hm.get(key, val_ptr))) {
 | |
|           hm.revert(val_ptr);
 | |
|         }
 | |
|         val_ptr = nullptr;
 | |
|       }
 | |
|     });
 | |
|   }
 | |
|   for (auto i = THREAD_COUNT / 2; i < THREAD_COUNT; ++i) {
 | |
|     get_threads[i] = std::thread([&, i]() {
 | |
|       HashKey key;
 | |
|       for (auto j = 0; j < DATA_COUNT_PER_THREAD; ++j) {
 | |
|         key.v_ = j + i * DATA_COUNT_PER_THREAD;
 | |
|         UNUSED(hm.contains_key(key));
 | |
|       }
 | |
|     });
 | |
|   }
 | |
| 
 | |
|   for (auto i = 0; i < THREAD_COUNT; ++i) {
 | |
|     insert_threads[i].join();
 | |
|   }
 | |
| 
 | |
|   for (auto i = 0; i < THREAD_COUNT; ++i) {
 | |
|     del_threads[i].join();
 | |
|   }
 | |
| 
 | |
|   for (auto i = 0; i < THREAD_COUNT; ++i) {
 | |
|     get_threads[i].join();
 | |
|   }
 | |
| 
 | |
|   EXPECT_EQ(DATA_COUNT_PER_THREAD * THREAD_COUNT / 2, hm.size());
 | |
|   int counter[3];
 | |
|   memset(counter, 0, sizeof(counter));
 | |
|   for (int i = 0; i < DATA_COUNT_PER_THREAD * THREAD_COUNT; ++i) {
 | |
|     ASSERT_TRUE(db[i] > 0);
 | |
|     ASSERT_TRUE(db[i] <= 2);
 | |
|     ++counter[db[i]];
 | |
|   }
 | |
|   EXPECT_EQ(0, counter[0]);
 | |
|   EXPECT_EQ(DATA_COUNT_PER_THREAD * THREAD_COUNT / 2, counter[1]);
 | |
|   EXPECT_EQ(DATA_COUNT_PER_THREAD * THREAD_COUNT / 2, counter[2]);
 | |
| 
 | |
|   hm.reset();
 | |
|   hm.purge();
 | |
| 
 | |
|   EXPECT_EQ(0, hm.count());
 | |
|   EXPECT_EQ(0, hm.size());
 | |
|   EXPECT_EQ(value_free, value_alloc);
 | |
|   EXPECT_EQ(node_free, node_alloc);
 | |
| }
 | |
| 
 | |
| TEST(TestObHashMap, Retire)
 | |
| {
 | |
|   std::thread* t;
 | |
|   {
 | |
|     Hashmap A;
 | |
|     HashKey key;
 | |
|     HashValue *val_ptr = nullptr;
 | |
|     EXPECT_EQ(OB_SUCCESS, A.init());
 | |
|     key.v_ = 1;
 | |
|     EXPECT_EQ(OB_SUCCESS, A.create(key, val_ptr));
 | |
|     A.revert(val_ptr);
 | |
|     EXPECT_EQ(OB_SUCCESS, A.del(key));
 | |
|     ATOMIC_INC(&STEP);
 | |
|     t = new std::thread([&]() {
 | |
|       usleep(10 * 1000);
 | |
|       Hashmap B;
 | |
|       HashKey key;
 | |
|       EXPECT_EQ(OB_SUCCESS, B.init());
 | |
|       key.v_ = 1;
 | |
|       EXPECT_EQ(OB_SUCCESS, B.create(key, val_ptr));
 | |
|       B.revert(val_ptr);
 | |
|     });
 | |
|     while(ATOMIC_LOAD(&STEP) != 2);
 | |
|     // ~A()
 | |
|   }
 | |
|   t->join();
 | |
|   delete t;
 | |
| }
 | |
| 
 | |
| int main(int argc, char **argv)
 | |
| {
 | |
|   testing::InitGoogleTest(&argc,argv);
 | |
|   oceanbase::common::ObLogger::get_logger().set_file_name("test_link_hashmap.log", true);
 | |
|   oceanbase::common::ObLogger::get_logger().set_log_level("INFO");
 | |
|   return RUN_ALL_TESTS();
 | |
| }
 | 
