Use batch hash in encoder

This commit is contained in:
DengzhiLiu
2024-07-03 09:49:32 +00:00
committed by ob-robot
parent 147a312d13
commit b042b281ef
4 changed files with 152 additions and 44 deletions

View File

@ -25,8 +25,9 @@ using namespace common;
ObEncodingHashTable::ObEncodingHashTable() : is_created_(false), bucket_num_(0),
node_num_(0), list_num_(0), node_cnt_(0), list_cnt_(0), buckets_(NULL), nodes_(NULL),
lists_(NULL), alloc_(blocksstable::OB_ENCODING_LABEL_HASH_TABLE, OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID())
{
lists_(NULL), skip_bit_(NULL), hash_val_(NULL),
alloc_(blocksstable::OB_ENCODING_LABEL_HASH_TABLE, OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID())
{
MEMSET(&null_nodes_, 0, sizeof(null_nodes_));
MEMSET(&nope_nodes_, 0, sizeof(nope_nodes_));
}
@ -58,6 +59,7 @@ int ObEncodingHashTable::create(const int64_t bucket_num, const int64_t node_num
const int64_t bucket_size = bucket_num_ * static_cast<int64_t>(sizeof(HashBucket));
const int64_t nodes_size = node_num_ * static_cast<int64_t>(sizeof(HashNode));
const int64_t lists_size = list_num_ * static_cast<int64_t>(sizeof(NodeList));
const int64_t vec_size = sql::ObBitVector::memory_size(node_num_);
if (OB_ISNULL(buckets_ = reinterpret_cast<HashBucket *>(alloc_.alloc(bucket_size)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
@ -68,10 +70,18 @@ int ObEncodingHashTable::create(const int64_t bucket_num, const int64_t node_num
} else if (OB_ISNULL(nodes_ = reinterpret_cast<HashNode *>(alloc_.alloc(nodes_size)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to alloc memory for nodes", K(ret), K(nodes_size));
} else if (OB_ISNULL(skip_bit_ = sql::to_bit_vector((char *)alloc_.alloc(vec_size)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to alloc memory for skip bit", K(ret), K(vec_size));
} else if (OB_ISNULL(hash_val_ = reinterpret_cast<uint64_t *>(alloc_.alloc(node_num_ * sizeof(uint64_t))))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to alloc memory for hash val", K(ret), K_(node_num));
} else {
MEMSET(buckets_, 0, bucket_size);
MEMSET(lists_, 0, lists_size);
MEMSET(nodes_, 0, nodes_size);
MEMSET(hash_val_, 0, node_num_ * sizeof(uint64_t));
skip_bit_->init(node_num_);
is_created_ = true;
}
}
@ -88,6 +98,8 @@ void ObEncodingHashTable::reset()
buckets_ = NULL;
nodes_ = NULL;
lists_ = NULL;
skip_bit_ = NULL;
hash_val_ = NULL;
MEMSET(&null_nodes_, 0, sizeof(null_nodes_));
MEMSET(&nope_nodes_, 0, sizeof(nope_nodes_));
is_created_ = false;
@ -100,10 +112,39 @@ void ObEncodingHashTable::reuse()
// nodes no need to reuse
MEMSET(&null_nodes_, 0, sizeof(null_nodes_));
MEMSET(&nope_nodes_, 0, sizeof(nope_nodes_));
MEMSET(hash_val_, 0, node_num_ * sizeof(uint64_t));
skip_bit_->init(node_num_);
node_cnt_ = 0;
list_cnt_ = 0;
}
int ObEncodingHashTableBuilder::add_to_table(const ObDatum &datum, const int64_t pos, const int64_t row_idx)
{
int ret = OB_SUCCESS;
NodeList *list = buckets_[pos];
while (OB_SUCC(ret) && nullptr != list) {
bool is_equal = false;
if (OB_FAIL(equal(*list->header_->datum_, datum, is_equal))) {
LOG_WARN("check datum equality failed", K(ret), K(datum), KPC(list->header_->datum_));
} else if (is_equal) {
add_to_list(*list, nodes_[row_idx], datum, node_cnt_);
break;
} else {
list = list->next_;
}
}
if (OB_SUCC(ret) && nullptr == list) {
list = &lists_[list_cnt_];
list->next_ = buckets_[pos];
buckets_[pos] = list;
list->insert_ref_ = list_cnt_++;
add_to_list(*list, nodes_[row_idx], datum, node_cnt_);
}
return ret;
}
int ObEncodingHashTableBuilder::build(const ObColDatums &col_datums, const ObColDesc &col_desc)
{
int ret = common::OB_SUCCESS;
@ -118,6 +159,7 @@ int ObEncodingHashTableBuilder::build(const ObColDatums &col_datums, const ObCol
ObObjTypeStoreClass store_class = get_store_class_map()[col_desc.col_type_.get_type_class()];
const bool need_binary_hash =
(store_class == ObTextSC || store_class == ObJsonSC || store_class == ObLobSC || store_class == ObGeometrySC || store_class == ObRoaringBitmapSC);
const bool need_batch_hash = !need_binary_hash;
bool has_lob_header = col_desc.col_type_.is_lob_storage();
ObPrecision precision = PRECISION_UNKNOWN_YET;
if (col_desc.col_type_.is_decimal_int()) {
@ -129,52 +171,71 @@ int ObEncodingHashTableBuilder::build(const ObColDatums &col_datums, const ObCol
col_desc.col_type_.get_type(), col_desc.col_type_.get_collation_type(),
col_desc.col_type_.get_scale(), lib::is_oracle_mode(), has_lob_header, precision);
ObHashFunc hash_func;
hash_func.hash_func_ = basic_funcs->murmur_hash_;
hash_func.hash_func_ = basic_funcs->murmur_hash_v2_;
hash_func.batch_hash_func_ = basic_funcs->murmur_hash_v2_batch_;
const uint64_t mask = (bucket_num_ - 1);
for (int64_t row_id = 0;
OB_SUCC(ret) && row_id < col_datums.count() && list_cnt_ < list_num_;
++row_id) {
const ObDatum &datum = col_datums.at(row_id);
if (datum.is_null()) {
add_to_list(null_nodes_, nodes_[row_id], datum);
} else if (datum.is_nop()) {
add_to_list(nope_nodes_, nodes_[row_id], datum);
} else if (datum.is_ext()) {
ret = common::OB_NOT_SUPPORTED;
STORAGE_LOG(WARN, "not supported extend object type",
K(ret), K(row_id), K(datum), K(*datum.extend_obj_));
int64_t dimension_size = col_datums.get_dimension_size();
int64_t datum_arr_cnt = col_datums.get_continuous_array_count();
int64_t datum_array_size = 0;
ObDatum *datum_arry = nullptr;
for (int64_t i = 0; OB_SUCC(ret) && i < datum_arr_cnt; i++) {
col_datums.get_continuous_array(i, datum_arry, datum_array_size);
if (OB_ISNULL(datum_arry)) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "unexpected null datum array", K(ret), K(i), K(datum_arr_cnt));
} else {
uint64_t pos = 0;
if (OB_FAIL(hash(datum, hash_func, need_binary_hash, pos))) {
STORAGE_LOG(WARN, "hash failed", K(ret));
} else {
pos = pos & mask;
}
NodeList *list = buckets_[pos];
while (OB_SUCC(ret) && nullptr != list) {
bool is_equal = false;
if (OB_FAIL(equal(*list->header_->datum_, datum, is_equal))) {
LOG_WARN("check datum equality failed", K(ret), K(datum), KPC(list->header_->datum_), K(col_desc));
} else if (is_equal) {
add_to_list(*list, nodes_[row_id], datum);
break;
} else {
list = list->next_;
skip_bit_->init(datum_array_size);
for (int64_t idx = 0; OB_SUCC(ret) && idx < datum_array_size && list_cnt_ < list_num_; ++idx) {
int64_t row_id = i * dimension_size + idx;
const ObDatum &datum = col_datums.at(row_id);
if (datum.is_null()) {
skip_bit_->set(idx);
add_to_list(null_nodes_, nodes_[row_id], datum, node_cnt_);
} else if (datum.is_nop()) {
skip_bit_->set(idx);
add_to_list(nope_nodes_, nodes_[row_id], datum, node_cnt_);
} else if (datum.is_ext()) {
ret = common::OB_NOT_SUPPORTED;
STORAGE_LOG(WARN, "not supported extend object type",
K(ret), K(row_id), K(datum), K(*datum.extend_obj_));
} else if (!need_batch_hash) {
uint64_t pos = 0;
if (OB_FAIL(hash(datum, hash_func, need_binary_hash, pos))) {
STORAGE_LOG(WARN, "hash failed", K(ret));
} else {
pos = pos & mask;
if (OB_FAIL(add_to_table(datum, pos, row_id))) {
STORAGE_LOG(WARN, "fail to add to table", K(ret), K(row_id));
}
}
}
}
if (OB_SUCC(ret) && nullptr == list) {
list = &lists_[list_cnt_];
list->next_ = buckets_[pos];
buckets_[pos] = list;
list->insert_ref_ = list_cnt_++;
}
add_to_list(*list, nodes_[row_id], datum);
if (OB_SUCC(ret) && need_batch_hash && !skip_bit_->is_all_true(datum_array_size)) {
const uint64_t seed = 0;
MEMSET(hash_val_, 0, datum_array_size * sizeof(int64_t));
hash_func.batch_hash_func_(
hash_val_,
datum_arry,
true,
*skip_bit_,
datum_array_size,
&seed,
false);
for (int64_t idx = 0; OB_SUCC(ret) && idx < datum_array_size && list_cnt_ < list_num_; ++idx) {
if (!skip_bit_->at(idx)) {
int64_t row_id = i * dimension_size + idx;
uint64_t pos = hash_val_[idx] & mask;
if (OB_FAIL(add_to_table(col_datums.at(row_id), pos, row_id))) {
STORAGE_LOG(WARN, "fail to add to table", K(ret), K(row_id), K(pos));
}
}
}
}
if (OB_SUCC(ret)) {
node_cnt_++;
}
}
if (OB_SUCC(ret)) {
// update dict reference id of null and nope node.
for (HashNode *n = null_nodes_.header_; NULL != n; n = n->next_) {
@ -188,13 +249,14 @@ int ObEncodingHashTableBuilder::build(const ObColDatums &col_datums, const ObCol
return ret;
}
void ObEncodingHashTableBuilder::add_to_list(NodeList &list, HashNode &node, const ObDatum &datum)
void ObEncodingHashTableBuilder::add_to_list(NodeList &list, HashNode &node, const ObDatum &datum, int64_t &node_cnt)
{
node.dict_ref_ = list.insert_ref_;
node.datum_ = &datum;
node.next_ = list.header_;
list.header_ = &node;
++list.size_;
++node_cnt;
}
int ObEncodingHashTableBuilder::equal(

View File

@ -258,7 +258,7 @@ public:
ConstIterator end() const { return lists_ + list_cnt_; }
TO_STRING_KV(K_(is_created), K_(bucket_num), K_(node_num), K_(list_num), K_(node_cnt), K_(list_cnt),
KP_(buckets), KP_(nodes), KP_(lists), K_(null_nodes), K_(nope_nodes));
KP_(buckets), KP_(nodes), KP_(lists), K_(null_nodes), K_(nope_nodes), KP_(skip_bit), KP_(hash_val));
protected:
bool is_created_;
@ -272,6 +272,8 @@ protected:
NodeList *lists_;
NodeList null_nodes_;
NodeList nope_nodes_;
sql::ObBitVector *skip_bit_;
uint64_t *hash_val_;
common::ObArenaAllocator alloc_;
private:
@ -290,7 +292,9 @@ private:
bool &is_equal);
static int hash(const ObDatum &datum, const ObHashFunc &hash_func, const bool need_binary, uint64_t &res);
static void add_to_list(NodeList &list, HashNode &node, const ObDatum &datum);
static void add_to_list(NodeList &list, HashNode &node, const ObDatum &datum, int64_t &node_cnt);
int add_to_table(const ObDatum &datum, const int64_t pos, const int64_t row_idx);
};
class ObEncodingHashTableFactory

View File

@ -634,6 +634,20 @@ public:
}
~ObPodFix2dArray() { destroy(); }
OB_INLINE int64_t get_dimension_size() const { return BLOCK_ITEM_CNT; }
OB_INLINE int64_t get_continuous_array_count() const
{
return (size_ + BLOCK_ITEM_CNT - 1) / BLOCK_ITEM_CNT;
}
OB_INLINE void get_continuous_array(const int64_t idx, T *&arr, int64_t &arr_cnt) const
{
arr = nullptr;
arr_cnt = 0;
if (idx * BLOCK_ITEM_CNT < size_) {
arr = block_list_[idx];
arr_cnt = MIN(BLOCK_ITEM_CNT, size_ - idx * BLOCK_ITEM_CNT);
}
}
OB_INLINE int64_t count() const { return size_; }
OB_INLINE bool empty() const { return size_ <= 0; }
OB_INLINE T &at(int64_t idx)

View File

@ -103,6 +103,34 @@ TEST(ObMultiDimArray_T, timestamp_with_time_zone)
ASSERT_EQ(2, hash_builder.list_cnt_);
}
TEST(ObMultiDimArray_T, dict_int)
{
// for timestamp with time zone, we need use binary_equal to build encoding hash table.
ObEncodingHashTableBuilder hash_builder;
ObArenaAllocator local_arena;
ASSERT_EQ(OB_SUCCESS, hash_builder.create(256, 256));
ObColDatums int_arr(local_arena);
ObStorageDatum datums[10];
datums[0].set_null();
ASSERT_EQ(OB_SUCCESS, int_arr.push_back(datums[0]));
for (int64_t i = 1; i < 10; i++) {
datums[i].set_int(i % 3);
ASSERT_EQ(OB_SUCCESS, int_arr.push_back(datums[i]));
}
ObColDesc col_desc;
col_desc.col_id_ = OB_APP_MIN_COLUMN_ID + 1;
col_desc.col_type_.set_int32();
ASSERT_EQ(OB_SUCCESS, hash_builder.build(int_arr, col_desc));
ASSERT_EQ(3, hash_builder.list_cnt_);
ASSERT_EQ(10, hash_builder.node_cnt_);
ASSERT_EQ(3, hash_builder.nodes_[0].dict_ref_);
for (int64_t i = 1; i < 10; i++) {
ASSERT_EQ((i + 2) % 3, hash_builder.nodes_[i].dict_ref_);
}
}
}
}
@ -114,4 +142,4 @@ int main(int argc, char **argv)
OB_LOGGER.set_file_name("test_encoding_util.log", true);
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
}