fix hash group by oom

This commit is contained in:
ls0
2021-11-09 15:12:51 +08:00
committed by LINxiansheng
parent 878c594b56
commit 04ecb035fe
7 changed files with 157 additions and 78 deletions

View File

@ -20,6 +20,7 @@
#include "sql/engine/basic/ob_chunk_row_store.h"
#include "lib/container/ob_2d_array.h"
#include "sql/engine/basic/ob_chunk_datum_store.h"
#include "sql/engine/ob_sql_mem_mgr_processor.h"
namespace oceanbase {
namespace common {
@ -35,14 +36,17 @@ class ObExtendHashTable {
public:
const static int64_t INITIAL_SIZE = 128;
const static int64_t SIZE_BUCKET_SCALE = 4;
ObExtendHashTable() : initial_bucket_num_(0), size_(0), buckets_(NULL), allocator_(NULL)
const static int64_t MAX_MEM_PERCENT = 40;
ObExtendHashTable() : initial_bucket_num_(0), size_(0), buckets_(NULL), allocator_(NULL),
sql_mem_processor_(nullptr)
{}
~ObExtendHashTable()
{
destroy();
}
int init(ObIAllocator* allocator, lib::ObMemAttr& mem_attr, int64_t initial_size = INITIAL_SIZE);
int init(ObIAllocator* allocator, lib::ObMemAttr& mem_attr,
ObSqlMemMgrProcessor *sql_mem_processor, int64_t initial_size = INITIAL_SIZE);
bool is_inited() const
{
return NULL != buckets_;
@ -70,7 +74,7 @@ public:
size_ = 0;
}
int resize(ObIAllocator* allocator, int64_t bucket_num);
int resize(ObIAllocator *allocator, int64_t bucket_num, ObSqlMemMgrProcessor *sql_mem_processor);
void destroy()
{
@ -82,6 +86,7 @@ public:
allocator_.set_allocator(nullptr);
size_ = 0;
initial_bucket_num_ = 0;
sql_mem_processor_ = nullptr;
}
int64_t mem_used() const
{
@ -116,6 +121,9 @@ public:
protected:
DISALLOW_COPY_AND_ASSIGN(ObExtendHashTable);
int extend();
int64_t estimate_bucket_num(
const int64_t bucket_num,
const int64_t max_hash_mem);
protected:
lib::ObMemAttr mem_attr_;
@ -124,11 +132,34 @@ protected:
using BucketArray = common::ObSegmentArray<Item*, OB_MALLOC_BIG_BLOCK_SIZE, common::ModulePageAllocator>;
BucketArray* buckets_;
common::ModulePageAllocator allocator_;
ObSqlMemMgrProcessor *sql_mem_processor_;
};
template <typename Item>
int64_t ObExtendHashTable<Item>::estimate_bucket_num(
const int64_t bucket_num,
const int64_t max_hash_mem)
{
int64_t max_bound_size = max_hash_mem * MAX_MEM_PERCENT / 100;
int64_t est_bucket_num = common::next_pow2(bucket_num);
int64_t est_size = est_bucket_num * sizeof(void*);
while (est_size > max_bound_size) {
est_bucket_num >>= 1;
est_size = est_bucket_num * sizeof(void*);
}
if (est_bucket_num < INITIAL_SIZE) {
est_bucket_num = INITIAL_SIZE;
}
return est_bucket_num;
}
template <typename Item>
int ObExtendHashTable<Item>::init(
ObIAllocator* allocator, lib::ObMemAttr& mem_attr, const int64_t initial_size /* INITIAL_SIZE */)
ObIAllocator *allocator,
lib::ObMemAttr &mem_attr,
ObSqlMemMgrProcessor *sql_mem_processor,
const int64_t initial_size /* INITIAL_SIZE */)
{
int ret = common::OB_SUCCESS;
if (initial_size < 2) {
@ -136,6 +167,7 @@ int ObExtendHashTable<Item>::init(
SQL_ENG_LOG(WARN, "invalid argument", K(ret));
} else {
mem_attr_ = mem_attr;
sql_mem_processor_ = sql_mem_processor;
allocator_.set_allocator(allocator);
allocator_.set_label(mem_attr.label_);
void* buckets_buf = NULL;
@ -157,12 +189,13 @@ int ObExtendHashTable<Item>::init(
}
template <typename Item>
int ObExtendHashTable<Item>::resize(ObIAllocator* allocator, int64_t bucket_num)
int ObExtendHashTable<Item>::resize(ObIAllocator* allocator, int64_t bucket_num,
ObSqlMemMgrProcessor *sql_mem_processor)
{
int ret = OB_SUCCESS;
if (bucket_num < get_bucket_num() / 2) {
destroy();
if (OB_FAIL(init(allocator, mem_attr_, bucket_num))) {
if (OB_FAIL(init(allocator, mem_attr_, sql_mem_processor, bucket_num))) {
SQL_ENG_LOG(WARN, "failed to reuse with bucket", K(bucket_num), K(ret));
}
} else {
@ -222,46 +255,52 @@ int ObExtendHashTable<Item>::extend()
{
common::hash::hash_func<Item> hf;
int ret = common::OB_SUCCESS;
const int64_t new_bucket_num =
0 == get_bucket_num() ? (0 == initial_bucket_num_ ? INITIAL_SIZE : initial_bucket_num_) : get_bucket_num() * 2;
BucketArray* new_buckets = NULL;
void* buckets_buf = NULL;
if (OB_ISNULL(buckets_buf = allocator_.alloc(sizeof(BucketArray), mem_attr_))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
SQL_ENG_LOG(WARN, "failed to allocate memory", K(ret));
int64_t pre_bucket_num = get_bucket_num();
int64_t new_bucket_num = 0 == pre_bucket_num ?
(0 == initial_bucket_num_ ? INITIAL_SIZE : initial_bucket_num_)
: pre_bucket_num * 2;
new_bucket_num = estimate_bucket_num(new_bucket_num, sql_mem_processor_->get_mem_bound());
if (new_bucket_num <= pre_bucket_num) {
} else {
new_buckets = new (buckets_buf) BucketArray(allocator_);
}
if (OB_FAIL(ret)) {
// do nothing
} else if (OB_ISNULL(buckets_)) {
ret = OB_INVALID_ARGUMENT;
SQL_ENG_LOG(WARN, "invalid argument", K(ret), K(buckets_));
} else if (OB_FAIL(new_buckets->init(new_bucket_num))) {
SQL_ENG_LOG(WARN, "resize bucket array failed", K(ret), K(new_bucket_num));
} else {
for (int64_t i = 0; i < get_bucket_num(); i++) {
Item* bucket = buckets_->at(i);
while (bucket != NULL) {
Item* item = bucket;
bucket = bucket->next();
Item*& new_bucket = new_buckets->at(hf(*item) & (new_bucket_num - 1));
item->next() = new_bucket;
new_bucket = item;
}
BucketArray* new_buckets = NULL;
void* buckets_buf = NULL;
if (OB_ISNULL(buckets_buf = allocator_.alloc(sizeof(BucketArray), mem_attr_))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
SQL_ENG_LOG(WARN, "failed to allocate memory", K(ret));
} else {
new_buckets = new (buckets_buf) BucketArray(allocator_);
}
buckets_->destroy();
allocator_.free(buckets_);
if (OB_FAIL(ret)) {
// do nothing
} else if (OB_ISNULL(buckets_)) {
ret = OB_INVALID_ARGUMENT;
SQL_ENG_LOG(WARN, "invalid argument", K(ret), K(buckets_));
} else if (OB_FAIL(new_buckets->init(new_bucket_num))) {
SQL_ENG_LOG(WARN, "resize bucket array failed", K(ret), K(new_bucket_num));
} else {
for (int64_t i = 0; i < get_bucket_num(); i++) {
Item* bucket = buckets_->at(i);
while (bucket != NULL) {
Item* item = bucket;
bucket = bucket->next();
Item*& new_bucket = new_buckets->at(hf(*item) & (new_bucket_num - 1));
item->next() = new_bucket;
new_bucket = item;
}
}
buckets_->destroy();
allocator_.free(buckets_);
buckets_ = new_buckets;
}
if (OB_FAIL(ret)) {
if (buckets_ == new_buckets) {
SQL_ENG_LOG(ERROR, "unexpected status: failed allocate new bucket", K(ret));
} else if (nullptr != new_buckets) {
new_buckets->destroy();
allocator_.free(new_buckets);
new_buckets = nullptr;
buckets_ = new_buckets;
}
if (OB_FAIL(ret)) {
if (buckets_ == new_buckets) {
SQL_ENG_LOG(ERROR, "unexpected status: failed allocate new bucket", K(ret));
} else if (nullptr != new_buckets) {
new_buckets->destroy();
allocator_.free(new_buckets);
new_buckets = nullptr;
}
}
}
return ret;